1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
27
"github.com/cubefs/cubefs/blobstore/common/crc32block"
28
"github.com/cubefs/cubefs/blobstore/common/trace"
29
"github.com/cubefs/cubefs/blobstore/util/bytespool"
30
"github.com/cubefs/cubefs/blobstore/util/errors"
33
// Config simple client config
35
// the whole request and response timeout
36
ClientTimeoutMs int64 `json:"client_timeout_ms"`
37
// bandwidthBPMs for read body
38
BodyBandwidthMBPs float64 `json:"body_bandwidth_mbps"`
40
// base timeout for read body
41
BodyBaseTimeoutMs int64 `json:"body_base_timeout_ms"`
43
Tc TransportConfig `json:"transport_config"`
46
// ErrBodyReadTimeout timeout error
47
var ErrBodyReadTimeout = errors.New("read body timeout")
49
// Option client options
50
type Option func(req *http.Request)
52
// WithCrcEncode request with crc32 encode
53
func WithCrcEncode() Option {
54
return func(req *http.Request) {
55
req.Header.Set(HeaderCrcEncoded, "1")
56
// util not support reader = nil
57
if req.ContentLength > 0 && req.Body != nil {
58
encoder := crc32block.NewBodyEncoder(req.Body)
60
if bodyGetter := req.GetBody; bodyGetter != nil {
61
req.GetBody = func() (io.ReadCloser, error) {
62
body, err := bodyGetter()
63
return crc32block.NewBodyEncoder(body), err
66
req.ContentLength = encoder.CodeSize(req.ContentLength)
71
// Client implements the rpc client with http
72
type Client interface {
73
// Method*** handle response by yourself
74
Do(ctx context.Context, req *http.Request) (*http.Response, error)
75
Head(ctx context.Context, url string) (*http.Response, error)
76
Get(ctx context.Context, url string) (*http.Response, error)
77
Delete(ctx context.Context, url string) (*http.Response, error)
78
Form(ctx context.Context, method, url string, form map[string][]string) (*http.Response, error)
79
Put(ctx context.Context, url string, params interface{}) (*http.Response, error)
80
Post(ctx context.Context, url string, params interface{}) (*http.Response, error)
82
// ***With means parse result in client
83
DoWith(ctx context.Context, req *http.Request, ret interface{}, opts ...Option) error
84
GetWith(ctx context.Context, url string, ret interface{}) error
85
PutWith(ctx context.Context, url string, ret interface{}, params interface{}, opts ...Option) error
86
PostWith(ctx context.Context, url string, ret interface{}, params interface{}, opts ...Option) error
88
// Close background goroutines in lb client
94
bandwidthBPMs int64 // using for reading body
95
bodyBaseTimeoutMs int64 // base time read body
98
// NewClient returns a rpc client
99
func NewClient(cfg *Config) Client {
103
cfg.Tc = cfg.Tc.Default()
104
if cfg.BodyBaseTimeoutMs == 0 {
105
cfg.BodyBaseTimeoutMs = 30 * 1e3
108
client: &http.Client{
109
Transport: NewTransport(&cfg.Tc),
110
Timeout: time.Duration(cfg.ClientTimeoutMs) * time.Millisecond,
112
bandwidthBPMs: int64(cfg.BodyBandwidthMBPs * (1 << 20) / 1e3),
113
bodyBaseTimeoutMs: cfg.BodyBaseTimeoutMs,
117
func (c *client) Form(ctx context.Context, method, url string, form map[string][]string) (resp *http.Response, err error) {
118
body := urllib.Values(form).Encode()
119
request, err := http.NewRequest(method, url, strings.NewReader(body))
123
return c.Do(ctx, request)
126
func (c *client) Put(ctx context.Context, url string, params interface{}) (resp *http.Response, err error) {
127
body, err := marshalObj(params)
131
request, err := http.NewRequest(http.MethodPut, url, body.Body)
135
request.Header.Set(HeaderContentType, body.ContentType)
136
return c.Do(ctx, request)
139
func (c *client) Post(ctx context.Context, url string, params interface{}) (resp *http.Response, err error) {
140
body, err := marshalObj(params)
144
request, err := http.NewRequest(http.MethodPost, url, body.Body)
148
request.Header.Set(HeaderContentType, body.ContentType)
149
return c.Do(ctx, request)
152
func (c *client) DoWith(ctx context.Context, req *http.Request, ret interface{}, opts ...Option) error {
153
for _, opt := range opts {
156
resp, err := c.Do(ctx, req)
160
defer resp.Body.Close()
162
err = serverCrcEncodeCheck(ctx, req, resp)
166
return ParseData(resp, ret)
169
func (c *client) GetWith(ctx context.Context, url string, ret interface{}) error {
170
resp, err := c.Get(ctx, url)
174
return parseData(resp, ret)
177
func (c *client) PutWith(ctx context.Context, url string, ret interface{}, params interface{}, opts ...Option) (err error) {
178
body, err := marshalObj(params)
182
request, err := http.NewRequest(http.MethodPut, url, body.Body)
186
request.Header.Set(HeaderContentType, body.ContentType)
187
for _, opt := range opts {
190
resp, err := c.Do(ctx, request)
194
defer resp.Body.Close()
195
err = serverCrcEncodeCheck(ctx, request, resp)
199
return ParseData(resp, ret)
202
func (c *client) PostWith(ctx context.Context, url string, ret interface{}, params interface{}, opts ...Option) error {
203
body, err := marshalObj(params)
207
request, err := http.NewRequest(http.MethodPost, url, body.Body)
211
request.Header.Set(HeaderContentType, body.ContentType)
213
for _, opt := range opts {
216
resp, err := c.Do(ctx, request)
220
defer resp.Body.Close()
222
err = serverCrcEncodeCheck(ctx, request, resp)
226
return ParseData(resp, ret)
229
func (c *client) Head(ctx context.Context, url string) (resp *http.Response, err error) {
230
req, err := http.NewRequest(http.MethodHead, url, nil)
234
return c.Do(ctx, req)
237
func (c *client) Get(ctx context.Context, url string) (resp *http.Response, err error) {
238
req, err := http.NewRequest(http.MethodGet, url, nil)
242
return c.Do(ctx, req)
245
func (c *client) Delete(ctx context.Context, url string) (resp *http.Response, err error) {
246
req, err := http.NewRequest(http.MethodDelete, url, nil)
250
return c.Do(ctx, req)
253
func (c *client) Do(ctx context.Context, req *http.Request) (*http.Response, error) {
254
if req.Header.Get(HeaderUA) == "" {
255
req.Header.Set(HeaderUA, UserAgent)
257
span := trace.SpanFromContextSafe(ctx)
258
err := trace.InjectWithHTTPHeader(ctx, req)
260
span.Errorf("inject failed, %v", err)
262
resp, err := c.doWithCtx(ctx, req)
267
header := resp.Header
268
traceLog := header[HeaderTraceLog]
269
if len(traceLog) > 0 {
270
span.AppendRPCTrackLog([]string{strings.Join(traceLog, ";")})
275
func (c *client) Close() {
276
// Do nothing to close.
279
func (c *client) doWithCtx(ctx context.Context, req *http.Request) (resp *http.Response, err error) {
280
span := trace.SpanFromContextSafe(ctx)
281
req = req.WithContext(ctx)
282
if c.bandwidthBPMs > 0 && req.Body != nil {
283
t := req.ContentLength/c.bandwidthBPMs + c.bodyBaseTimeoutMs
284
req.Body = &timeoutReadCloser{timeoutMs: t, body: req.Body}
286
resp, err = c.client.Do(req)
288
span.Warnf("do request to %s failed, error: %s", req.URL, err.Error())
291
if c.bandwidthBPMs > 0 {
292
t := resp.ContentLength/c.bandwidthBPMs + c.bodyBaseTimeoutMs
293
resp.Body = &timeoutReadCloser{timeoutMs: t, body: resp.Body}
298
// parseData close response body in this package.
299
func parseData(resp *http.Response, data interface{}) (err error) {
300
defer resp.Body.Close()
301
return ParseData(resp, data)
304
// ParseData parse response with data, close response body by yourself.
305
func ParseData(resp *http.Response, data interface{}) (err error) {
306
if resp.StatusCode/100 == 2 {
307
size := resp.ContentLength
308
if data != nil && size != 0 {
309
if d, ok := data.(UnmarshalerFrom); ok {
310
return d.UnmarshalFrom(io.LimitReader(resp.Body, size))
313
if d, ok := data.(Unmarshaler); ok {
314
buf := bytespool.Alloc(int(size))
315
defer bytespool.Free(buf)
316
if _, err = io.ReadFull(resp.Body, buf); err != nil {
317
return NewError(resp.StatusCode, "ReadResponse", err)
319
return d.Unmarshal(buf)
322
if err := json.NewDecoder(resp.Body).Decode(data); err != nil {
323
return NewError(resp.StatusCode, "JSONDecode", err)
326
if resp.StatusCode == 200 {
329
return NewError(resp.StatusCode, "", err)
332
return ParseResponseErr(resp)
335
// ParseResponseErr parse error of response
336
func ParseResponseErr(resp *http.Response) (err error) {
337
// wrap the error with HttpError for StatusCode is not 2XX
338
if resp.StatusCode > 299 && resp.ContentLength != 0 {
339
errR := &errorResponse{}
340
if err := json.NewDecoder(resp.Body).Decode(errR); err != nil {
341
return NewError(resp.StatusCode, resp.Status, nil)
343
err = NewError(resp.StatusCode, errR.Code, errors.New(errR.Error))
346
return NewError(resp.StatusCode, resp.Status, nil)
349
type timeoutReadCloser struct {
354
func (tr *timeoutReadCloser) Close() (err error) {
355
return tr.body.Close()
358
func (tr *timeoutReadCloser) Read(p []byte) (n int, err error) {
359
readOk := make(chan struct{})
360
if tr.timeoutMs > 0 {
361
startTime := time.Now().UnixNano() / 1e6
362
after := time.After(time.Millisecond * time.Duration(tr.timeoutMs))
364
n, err = tr.body.Read(p)
370
tr.timeoutMs = tr.timeoutMs - (time.Now().UnixNano()/1e6 - startTime)
374
return 0, ErrBodyReadTimeout
378
return 0, ErrBodyReadTimeout
381
func serverCrcEncodeCheck(ctx context.Context, request *http.Request, resp *http.Response) (err error) {
382
// set Header and log errors
383
if request.Header.Get(HeaderCrcEncoded) != "" && resp.Header.Get(HeaderAckCrcEncoded) == "" {
384
msg := fmt.Sprintf("server do not ack that body has been crc encoded, url:%v", request.URL)
385
trace.SpanFromContextSafe(ctx).Error(msg)
386
return NewError(http.StatusNotImplemented, "resp.Status", errors.New(msg))