cubefs

Форк
0
389 строк · 10.8 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package rpc
16

17
import (
18
	"context"
19
	"encoding/json"
20
	"fmt"
21
	"io"
22
	"net/http"
23
	urllib "net/url"
24
	"strings"
25
	"time"
26

27
	"github.com/cubefs/cubefs/blobstore/common/crc32block"
28
	"github.com/cubefs/cubefs/blobstore/common/trace"
29
	"github.com/cubefs/cubefs/blobstore/util/bytespool"
30
	"github.com/cubefs/cubefs/blobstore/util/errors"
31
)
32

33
// Config simple client config
34
type Config struct {
35
	// the whole request and response timeout
36
	ClientTimeoutMs int64 `json:"client_timeout_ms"`
37
	// bandwidthBPMs for read body
38
	BodyBandwidthMBPs float64 `json:"body_bandwidth_mbps"`
39

40
	// base timeout for read body
41
	BodyBaseTimeoutMs int64 `json:"body_base_timeout_ms"`
42
	// transport config
43
	Tc TransportConfig `json:"transport_config"`
44
}
45

46
// ErrBodyReadTimeout timeout error
47
var ErrBodyReadTimeout = errors.New("read body timeout")
48

49
// Option client options
50
type Option func(req *http.Request)
51

52
// WithCrcEncode request with crc32 encode
53
func WithCrcEncode() Option {
54
	return func(req *http.Request) {
55
		req.Header.Set(HeaderCrcEncoded, "1")
56
		// util not support reader = nil
57
		if req.ContentLength > 0 && req.Body != nil {
58
			encoder := crc32block.NewBodyEncoder(req.Body)
59
			req.Body = encoder
60
			if bodyGetter := req.GetBody; bodyGetter != nil {
61
				req.GetBody = func() (io.ReadCloser, error) {
62
					body, err := bodyGetter()
63
					return crc32block.NewBodyEncoder(body), err
64
				}
65
			}
66
			req.ContentLength = encoder.CodeSize(req.ContentLength)
67
		}
68
	}
69
}
70

71
// Client implements the rpc client with http
72
type Client interface {
73
	// Method*** handle response by yourself
74
	Do(ctx context.Context, req *http.Request) (*http.Response, error)
75
	Head(ctx context.Context, url string) (*http.Response, error)
76
	Get(ctx context.Context, url string) (*http.Response, error)
77
	Delete(ctx context.Context, url string) (*http.Response, error)
78
	Form(ctx context.Context, method, url string, form map[string][]string) (*http.Response, error)
79
	Put(ctx context.Context, url string, params interface{}) (*http.Response, error)
80
	Post(ctx context.Context, url string, params interface{}) (*http.Response, error)
81

82
	// ***With means parse result in client
83
	DoWith(ctx context.Context, req *http.Request, ret interface{}, opts ...Option) error
84
	GetWith(ctx context.Context, url string, ret interface{}) error
85
	PutWith(ctx context.Context, url string, ret interface{}, params interface{}, opts ...Option) error
86
	PostWith(ctx context.Context, url string, ret interface{}, params interface{}, opts ...Option) error
87

88
	// Close background goroutines in lb client
89
	Close()
90
}
91

92
type client struct {
93
	client            *http.Client
94
	bandwidthBPMs     int64 // using for reading body
95
	bodyBaseTimeoutMs int64 // base time read body
96
}
97

98
// NewClient returns a rpc client
99
func NewClient(cfg *Config) Client {
100
	if cfg == nil {
101
		cfg = &Config{}
102
	}
103
	cfg.Tc = cfg.Tc.Default()
104
	if cfg.BodyBaseTimeoutMs == 0 {
105
		cfg.BodyBaseTimeoutMs = 30 * 1e3
106
	}
107
	return &client{
108
		client: &http.Client{
109
			Transport: NewTransport(&cfg.Tc),
110
			Timeout:   time.Duration(cfg.ClientTimeoutMs) * time.Millisecond,
111
		},
112
		bandwidthBPMs:     int64(cfg.BodyBandwidthMBPs * (1 << 20) / 1e3),
113
		bodyBaseTimeoutMs: cfg.BodyBaseTimeoutMs,
114
	}
115
}
116

117
func (c *client) Form(ctx context.Context, method, url string, form map[string][]string) (resp *http.Response, err error) {
118
	body := urllib.Values(form).Encode()
119
	request, err := http.NewRequest(method, url, strings.NewReader(body))
120
	if err != nil {
121
		return
122
	}
123
	return c.Do(ctx, request)
124
}
125

126
func (c *client) Put(ctx context.Context, url string, params interface{}) (resp *http.Response, err error) {
127
	body, err := marshalObj(params)
128
	if err != nil {
129
		return
130
	}
131
	request, err := http.NewRequest(http.MethodPut, url, body.Body)
132
	if err != nil {
133
		return
134
	}
135
	request.Header.Set(HeaderContentType, body.ContentType)
136
	return c.Do(ctx, request)
137
}
138

139
func (c *client) Post(ctx context.Context, url string, params interface{}) (resp *http.Response, err error) {
140
	body, err := marshalObj(params)
141
	if err != nil {
142
		return nil, err
143
	}
144
	request, err := http.NewRequest(http.MethodPost, url, body.Body)
145
	if err != nil {
146
		return nil, err
147
	}
148
	request.Header.Set(HeaderContentType, body.ContentType)
149
	return c.Do(ctx, request)
150
}
151

152
func (c *client) DoWith(ctx context.Context, req *http.Request, ret interface{}, opts ...Option) error {
153
	for _, opt := range opts {
154
		opt(req)
155
	}
156
	resp, err := c.Do(ctx, req)
157
	if err != nil {
158
		return err
159
	}
160
	defer resp.Body.Close()
161

162
	err = serverCrcEncodeCheck(ctx, req, resp)
163
	if err != nil {
164
		return err
165
	}
166
	return ParseData(resp, ret)
167
}
168

169
func (c *client) GetWith(ctx context.Context, url string, ret interface{}) error {
170
	resp, err := c.Get(ctx, url)
171
	if err != nil {
172
		return err
173
	}
174
	return parseData(resp, ret)
175
}
176

177
func (c *client) PutWith(ctx context.Context, url string, ret interface{}, params interface{}, opts ...Option) (err error) {
178
	body, err := marshalObj(params)
179
	if err != nil {
180
		return
181
	}
182
	request, err := http.NewRequest(http.MethodPut, url, body.Body)
183
	if err != nil {
184
		return
185
	}
186
	request.Header.Set(HeaderContentType, body.ContentType)
187
	for _, opt := range opts {
188
		opt(request)
189
	}
190
	resp, err := c.Do(ctx, request)
191
	if err != nil {
192
		return
193
	}
194
	defer resp.Body.Close()
195
	err = serverCrcEncodeCheck(ctx, request, resp)
196
	if err != nil {
197
		return err
198
	}
199
	return ParseData(resp, ret)
200
}
201

202
func (c *client) PostWith(ctx context.Context, url string, ret interface{}, params interface{}, opts ...Option) error {
203
	body, err := marshalObj(params)
204
	if err != nil {
205
		return err
206
	}
207
	request, err := http.NewRequest(http.MethodPost, url, body.Body)
208
	if err != nil {
209
		return err
210
	}
211
	request.Header.Set(HeaderContentType, body.ContentType)
212

213
	for _, opt := range opts {
214
		opt(request)
215
	}
216
	resp, err := c.Do(ctx, request)
217
	if err != nil {
218
		return err
219
	}
220
	defer resp.Body.Close()
221

222
	err = serverCrcEncodeCheck(ctx, request, resp)
223
	if err != nil {
224
		return err
225
	}
226
	return ParseData(resp, ret)
227
}
228

229
func (c *client) Head(ctx context.Context, url string) (resp *http.Response, err error) {
230
	req, err := http.NewRequest(http.MethodHead, url, nil)
231
	if err != nil {
232
		return
233
	}
234
	return c.Do(ctx, req)
235
}
236

237
func (c *client) Get(ctx context.Context, url string) (resp *http.Response, err error) {
238
	req, err := http.NewRequest(http.MethodGet, url, nil)
239
	if err != nil {
240
		return
241
	}
242
	return c.Do(ctx, req)
243
}
244

245
func (c *client) Delete(ctx context.Context, url string) (resp *http.Response, err error) {
246
	req, err := http.NewRequest(http.MethodDelete, url, nil)
247
	if err != nil {
248
		return
249
	}
250
	return c.Do(ctx, req)
251
}
252

253
func (c *client) Do(ctx context.Context, req *http.Request) (*http.Response, error) {
254
	if req.Header.Get(HeaderUA) == "" {
255
		req.Header.Set(HeaderUA, UserAgent)
256
	}
257
	span := trace.SpanFromContextSafe(ctx)
258
	err := trace.InjectWithHTTPHeader(ctx, req)
259
	if err != nil {
260
		span.Errorf("inject failed, %v", err)
261
	}
262
	resp, err := c.doWithCtx(ctx, req)
263
	if err != nil {
264
		return resp, err
265
	}
266

267
	header := resp.Header
268
	traceLog := header[HeaderTraceLog]
269
	if len(traceLog) > 0 {
270
		span.AppendRPCTrackLog([]string{strings.Join(traceLog, ";")})
271
	}
272
	return resp, err
273
}
274

275
func (c *client) Close() {
276
	// Do nothing to close.
277
}
278

279
func (c *client) doWithCtx(ctx context.Context, req *http.Request) (resp *http.Response, err error) {
280
	span := trace.SpanFromContextSafe(ctx)
281
	req = req.WithContext(ctx)
282
	if c.bandwidthBPMs > 0 && req.Body != nil {
283
		t := req.ContentLength/c.bandwidthBPMs + c.bodyBaseTimeoutMs
284
		req.Body = &timeoutReadCloser{timeoutMs: t, body: req.Body}
285
	}
286
	resp, err = c.client.Do(req)
287
	if err != nil {
288
		span.Warnf("do request to %s failed, error: %s", req.URL, err.Error())
289
		return
290
	}
291
	if c.bandwidthBPMs > 0 {
292
		t := resp.ContentLength/c.bandwidthBPMs + c.bodyBaseTimeoutMs
293
		resp.Body = &timeoutReadCloser{timeoutMs: t, body: resp.Body}
294
	}
295
	return
296
}
297

298
// parseData close response body in this package.
299
func parseData(resp *http.Response, data interface{}) (err error) {
300
	defer resp.Body.Close()
301
	return ParseData(resp, data)
302
}
303

304
// ParseData parse response with data, close response body by yourself.
305
func ParseData(resp *http.Response, data interface{}) (err error) {
306
	if resp.StatusCode/100 == 2 {
307
		size := resp.ContentLength
308
		if data != nil && size != 0 {
309
			if d, ok := data.(UnmarshalerFrom); ok {
310
				return d.UnmarshalFrom(io.LimitReader(resp.Body, size))
311
			}
312

313
			if d, ok := data.(Unmarshaler); ok {
314
				buf := bytespool.Alloc(int(size))
315
				defer bytespool.Free(buf)
316
				if _, err = io.ReadFull(resp.Body, buf); err != nil {
317
					return NewError(resp.StatusCode, "ReadResponse", err)
318
				}
319
				return d.Unmarshal(buf)
320
			}
321

322
			if err := json.NewDecoder(resp.Body).Decode(data); err != nil {
323
				return NewError(resp.StatusCode, "JSONDecode", err)
324
			}
325
		}
326
		if resp.StatusCode == 200 {
327
			return nil
328
		}
329
		return NewError(resp.StatusCode, "", err)
330
	}
331

332
	return ParseResponseErr(resp)
333
}
334

335
// ParseResponseErr parse error of response
336
func ParseResponseErr(resp *http.Response) (err error) {
337
	// wrap the error with HttpError for StatusCode is not 2XX
338
	if resp.StatusCode > 299 && resp.ContentLength != 0 {
339
		errR := &errorResponse{}
340
		if err := json.NewDecoder(resp.Body).Decode(errR); err != nil {
341
			return NewError(resp.StatusCode, resp.Status, nil)
342
		}
343
		err = NewError(resp.StatusCode, errR.Code, errors.New(errR.Error))
344
		return
345
	}
346
	return NewError(resp.StatusCode, resp.Status, nil)
347
}
348

349
type timeoutReadCloser struct {
350
	body      io.ReadCloser
351
	timeoutMs int64
352
}
353

354
func (tr *timeoutReadCloser) Close() (err error) {
355
	return tr.body.Close()
356
}
357

358
func (tr *timeoutReadCloser) Read(p []byte) (n int, err error) {
359
	readOk := make(chan struct{})
360
	if tr.timeoutMs > 0 {
361
		startTime := time.Now().UnixNano() / 1e6
362
		after := time.After(time.Millisecond * time.Duration(tr.timeoutMs))
363
		go func() {
364
			n, err = tr.body.Read(p)
365
			close(readOk)
366
		}()
367
		select {
368
		case <-readOk:
369
			// really cost time
370
			tr.timeoutMs = tr.timeoutMs - (time.Now().UnixNano()/1e6 - startTime)
371
			return
372
		case <-after:
373
			tr.body.Close()
374
			return 0, ErrBodyReadTimeout
375
		}
376
	}
377
	tr.body.Close()
378
	return 0, ErrBodyReadTimeout
379
}
380

381
func serverCrcEncodeCheck(ctx context.Context, request *http.Request, resp *http.Response) (err error) {
382
	// set Header and log errors
383
	if request.Header.Get(HeaderCrcEncoded) != "" && resp.Header.Get(HeaderAckCrcEncoded) == "" {
384
		msg := fmt.Sprintf("server do not ack that body has been crc encoded, url:%v", request.URL)
385
		trace.SpanFromContextSafe(ctx).Error(msg)
386
		return NewError(http.StatusNotImplemented, "resp.Status", errors.New(msg))
387
	}
388
	return nil
389
}
390

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.