25
"github.com/cubefs/cubefs/blobstore/common/trace"
28
var errNoHost = errors.New("no host available")
33
Hosts []string `json:"hosts"`
35
BackupHosts []string `json:"backup_hosts"`
38
HostTryTimes int `json:"host_try_times"`
41
FailRetryIntervalS int `json:"fail_retry_interval_s"`
44
MaxFailsPeriodS int `json:"max_fails_period_s"`
47
RequestTryTimes int `json:"try_times"`
50
ShouldRetry func(code int, err error) bool `json:"-"`
59
clientMap map[string]Client
65
var _ Client = (*lbClient)(nil)
68
func NewLbClient(cfg *LbConfig, sel Selector) Client {
72
cfg.Config.Tc = cfg.Config.Tc.Default()
73
if cfg.HostTryTimes == 0 {
74
cfg.HostTryTimes = (len(cfg.Hosts) + len(cfg.BackupHosts)) * 2
76
if cfg.MaxFailsPeriodS == 0 {
77
cfg.MaxFailsPeriodS = 1
79
if cfg.RequestTryTimes == 0 {
80
cfg.RequestTryTimes = cfg.HostTryTimes + 1
82
if cfg.ShouldRetry == nil {
83
cfg.ShouldRetry = defaultShouldRetry
85
if cfg.HostTryTimes > cfg.RequestTryTimes {
86
cfg.HostTryTimes = cfg.RequestTryTimes - 1
88
if cfg.FailRetryIntervalS == 0 {
89
cfg.FailRetryIntervalS = -1
92
sel = newSelector(cfg)
94
cl := &lbClient{sel: sel, cfg: cfg}
95
cl.clientMap = make(map[string]Client)
96
for _, host := range cfg.Hosts {
97
cl.clientMap[host] = NewClient(&cfg.Config)
99
for _, host := range cfg.BackupHosts {
100
cl.clientMap[host] = NewClient(&cfg.Config)
103
cl.requestTryTimes = cfg.RequestTryTimes
107
var defaultShouldRetry = func(code int, err error) bool {
108
if err != nil || (code/100 != 4 && code/100 != 2) {
114
func (c *lbClient) Do(ctx context.Context, req *http.Request) (*http.Response, error) {
115
return c.doCtx(ctx, req)
118
func (c *lbClient) Form(ctx context.Context, method, url string, form map[string][]string) (resp *http.Response, err error) {
119
body := urllib.Values(form).Encode()
120
req, err := http.NewRequest(method, url, strings.NewReader(body))
124
return c.Do(ctx, req)
127
func (c *lbClient) Put(ctx context.Context, url string, params interface{}) (resp *http.Response, err error) {
128
body, err := marshalObj(params)
132
request, err := http.NewRequest(http.MethodPut, url, body.Body)
136
request.Header.Set(HeaderContentType, body.ContentType)
137
return c.Do(ctx, request)
140
func (c *lbClient) Post(ctx context.Context, url string, params interface{}) (resp *http.Response, err error) {
141
body, err := marshalObj(params)
145
request, err := http.NewRequest(http.MethodPost, url, body.Body)
149
request.Header.Set(HeaderContentType, body.ContentType)
150
return c.Do(ctx, request)
153
func (c *lbClient) DoWith(ctx context.Context, req *http.Request, ret interface{}, opts ...Option) error {
154
for _, opt := range opts {
157
resp, err := c.Do(ctx, req)
161
defer resp.Body.Close()
162
err = serverCrcEncodeCheck(ctx, req, resp)
166
return ParseData(resp, ret)
169
func (c *lbClient) GetWith(ctx context.Context, url string, ret interface{}) error {
170
resp, err := c.Get(ctx, url)
174
return parseData(resp, ret)
177
func (c *lbClient) PutWith(ctx context.Context, url string, ret interface{}, params interface{}, opts ...Option) (err error) {
178
body, err := marshalObj(params)
182
request, err := http.NewRequest(http.MethodPut, url, body.Body)
186
request.Header.Set(HeaderContentType, body.ContentType)
187
for _, opt := range opts {
190
resp, err := c.Do(ctx, request)
194
defer resp.Body.Close()
195
err = serverCrcEncodeCheck(ctx, request, resp)
199
return ParseData(resp, ret)
202
func (c *lbClient) PostWith(ctx context.Context, url string, ret interface{}, params interface{}, opts ...Option) error {
203
body, err := marshalObj(params)
207
request, err := http.NewRequest(http.MethodPost, url, body.Body)
211
request.Header.Set(HeaderContentType, body.ContentType)
213
for _, opt := range opts {
216
resp, err := c.Do(ctx, request)
220
defer resp.Body.Close()
223
err = serverCrcEncodeCheck(ctx, request, resp)
227
return ParseData(resp, ret)
230
func (c *lbClient) Head(ctx context.Context, url string) (resp *http.Response, err error) {
231
req, err := http.NewRequest(http.MethodHead, url, nil)
235
return c.Do(ctx, req)
238
func (c *lbClient) Get(ctx context.Context, url string) (resp *http.Response, err error) {
239
req, err := http.NewRequest(http.MethodGet, url, nil)
243
return c.Do(ctx, req)
246
func (c *lbClient) Delete(ctx context.Context, url string) (resp *http.Response, err error) {
247
req, err := http.NewRequest(http.MethodDelete, url, nil)
251
return c.Do(ctx, req)
254
func (c *lbClient) doCtx(ctx context.Context, r *http.Request) (resp *http.Response, err error) {
255
reqURI := r.URL.RequestURI()
256
span := trace.SpanFromContextSafe(ctx)
257
span.Debug("lb.doCtx: start", reqURI)
260
tryTimes = c.requestTryTimes
264
for i := 0; i < tryTimes; i++ {
266
if resp != nil && resp.Body != nil {
272
return nil, ctx.Err()
276
if index == len(hosts) || hosts == nil {
277
hosts = c.sel.GetAvailableHosts()
280
span.Errorf("lb.doCtx: get host failed: %s", err.Error())
287
r.URL, err = urllib.Parse(host + reqURI)
289
span.Errorf("lb.doCtx: parse %s error", host+reqURI)
293
resp, err = c.clientMap[host].Do(ctx, r)
295
span.Warnf("lb.doCtx: the last host of request, try times: %d, err: %v, host: %s",
301
code = resp.StatusCode
303
logInfo := fmt.Sprintf("try times: %d, code: %d, err: %v, host: %s", i+1, code, err, host)
304
if c.cfg.ShouldRetry(code, err) {
305
span.Info("lb.doCtx: retry host,", logInfo)
311
if r.GetBody != nil {
313
r.Body, _err = r.GetBody()
315
span.Warnf("lb.doCtx: retry failed, try times: %d, code: %d, err: %v, host: %s",
316
i+1, code, _err, host)
321
span.Warn("lb.doCtx: request not support retry,", logInfo)
324
span.Debug("lb.doCtx: the last host of request,", logInfo)
330
func (c *lbClient) Close() {