cubefs

Форк
0
332 строки · 8.2 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package rpc
16

17
import (
18
	"context"
19
	"errors"
20
	"fmt"
21
	"net/http"
22
	urllib "net/url"
23
	"strings"
24

25
	"github.com/cubefs/cubefs/blobstore/common/trace"
26
)
27

28
var errNoHost = errors.New("no host available")
29

30
// LbConfig load balance config
31
type LbConfig struct {
32
	// hosts
33
	Hosts []string `json:"hosts"`
34
	// backup hosts
35
	BackupHosts []string `json:"backup_hosts"`
36
	// HostTryTimes Number of host failure retries, HostTryTimes < RequestTryTimes,
37
	// Avoid requesting the unavailable host all the time
38
	HostTryTimes int `json:"host_try_times"`
39
	// Failure retry interval, default value is -1, if FailRetryIntervalS < 0,
40
	// remove failed hosts will not work.
41
	FailRetryIntervalS int `json:"fail_retry_interval_s"`
42
	// Within MaxFailsPeriodS, if the number of failures is greater than or equal
43
	// to MaxFails, the host is considered disconnected.
44
	MaxFailsPeriodS int `json:"max_fails_period_s"`
45

46
	// RequestTryTimes The maximum number of attempts for a request hosts.
47
	RequestTryTimes int `json:"try_times"`
48

49
	// should retry function
50
	ShouldRetry func(code int, err error) bool `json:"-"`
51

52
	// config for simple client
53
	Config
54
}
55

56
type lbClient struct {
57
	requestTryTimes int
58
	// host for simple client
59
	clientMap map[string]Client
60

61
	sel Selector
62
	cfg *LbConfig
63
}
64

65
var _ Client = (*lbClient)(nil)
66

67
// NewLbClient returns a lb client
68
func NewLbClient(cfg *LbConfig, sel Selector) Client {
69
	if cfg == nil {
70
		cfg = &LbConfig{}
71
	}
72
	cfg.Config.Tc = cfg.Config.Tc.Default()
73
	if cfg.HostTryTimes == 0 {
74
		cfg.HostTryTimes = (len(cfg.Hosts) + len(cfg.BackupHosts)) * 2
75
	}
76
	if cfg.MaxFailsPeriodS == 0 {
77
		cfg.MaxFailsPeriodS = 1
78
	}
79
	if cfg.RequestTryTimes == 0 {
80
		cfg.RequestTryTimes = cfg.HostTryTimes + 1
81
	}
82
	if cfg.ShouldRetry == nil {
83
		cfg.ShouldRetry = defaultShouldRetry
84
	}
85
	if cfg.HostTryTimes > cfg.RequestTryTimes {
86
		cfg.HostTryTimes = cfg.RequestTryTimes - 1
87
	}
88
	if cfg.FailRetryIntervalS == 0 {
89
		cfg.FailRetryIntervalS = -1
90
	}
91
	if sel == nil {
92
		sel = newSelector(cfg)
93
	}
94
	cl := &lbClient{sel: sel, cfg: cfg}
95
	cl.clientMap = make(map[string]Client)
96
	for _, host := range cfg.Hosts {
97
		cl.clientMap[host] = NewClient(&cfg.Config)
98
	}
99
	for _, host := range cfg.BackupHosts {
100
		cl.clientMap[host] = NewClient(&cfg.Config)
101
	}
102

103
	cl.requestTryTimes = cfg.RequestTryTimes
104
	return cl
105
}
106

107
var defaultShouldRetry = func(code int, err error) bool {
108
	if err != nil || (code/100 != 4 && code/100 != 2) {
109
		return true
110
	}
111
	return false
112
}
113

114
func (c *lbClient) Do(ctx context.Context, req *http.Request) (*http.Response, error) {
115
	return c.doCtx(ctx, req)
116
}
117

118
func (c *lbClient) Form(ctx context.Context, method, url string, form map[string][]string) (resp *http.Response, err error) {
119
	body := urllib.Values(form).Encode()
120
	req, err := http.NewRequest(method, url, strings.NewReader(body))
121
	if err != nil {
122
		return
123
	}
124
	return c.Do(ctx, req)
125
}
126

127
func (c *lbClient) Put(ctx context.Context, url string, params interface{}) (resp *http.Response, err error) {
128
	body, err := marshalObj(params)
129
	if err != nil {
130
		return
131
	}
132
	request, err := http.NewRequest(http.MethodPut, url, body.Body)
133
	if err != nil {
134
		return
135
	}
136
	request.Header.Set(HeaderContentType, body.ContentType)
137
	return c.Do(ctx, request)
138
}
139

140
func (c *lbClient) Post(ctx context.Context, url string, params interface{}) (resp *http.Response, err error) {
141
	body, err := marshalObj(params)
142
	if err != nil {
143
		return nil, err
144
	}
145
	request, err := http.NewRequest(http.MethodPost, url, body.Body)
146
	if err != nil {
147
		return nil, err
148
	}
149
	request.Header.Set(HeaderContentType, body.ContentType)
150
	return c.Do(ctx, request)
151
}
152

153
func (c *lbClient) DoWith(ctx context.Context, req *http.Request, ret interface{}, opts ...Option) error {
154
	for _, opt := range opts {
155
		opt(req)
156
	}
157
	resp, err := c.Do(ctx, req)
158
	if err != nil {
159
		return err
160
	}
161
	defer resp.Body.Close()
162
	err = serverCrcEncodeCheck(ctx, req, resp)
163
	if err != nil {
164
		return err
165
	}
166
	return ParseData(resp, ret)
167
}
168

169
func (c *lbClient) GetWith(ctx context.Context, url string, ret interface{}) error {
170
	resp, err := c.Get(ctx, url)
171
	if err != nil {
172
		return err
173
	}
174
	return parseData(resp, ret)
175
}
176

177
func (c *lbClient) PutWith(ctx context.Context, url string, ret interface{}, params interface{}, opts ...Option) (err error) {
178
	body, err := marshalObj(params)
179
	if err != nil {
180
		return
181
	}
182
	request, err := http.NewRequest(http.MethodPut, url, body.Body)
183
	if err != nil {
184
		return
185
	}
186
	request.Header.Set(HeaderContentType, body.ContentType)
187
	for _, opt := range opts {
188
		opt(request)
189
	}
190
	resp, err := c.Do(ctx, request)
191
	if err != nil {
192
		return
193
	}
194
	defer resp.Body.Close()
195
	err = serverCrcEncodeCheck(ctx, request, resp)
196
	if err != nil {
197
		return err
198
	}
199
	return ParseData(resp, ret)
200
}
201

202
func (c *lbClient) PostWith(ctx context.Context, url string, ret interface{}, params interface{}, opts ...Option) error {
203
	body, err := marshalObj(params)
204
	if err != nil {
205
		return err
206
	}
207
	request, err := http.NewRequest(http.MethodPost, url, body.Body)
208
	if err != nil {
209
		return err
210
	}
211
	request.Header.Set(HeaderContentType, body.ContentType)
212

213
	for _, opt := range opts {
214
		opt(request)
215
	}
216
	resp, err := c.Do(ctx, request)
217
	if err != nil {
218
		return err
219
	}
220
	defer resp.Body.Close()
221

222
	// set Header and log errors
223
	err = serverCrcEncodeCheck(ctx, request, resp)
224
	if err != nil {
225
		return err
226
	}
227
	return ParseData(resp, ret)
228
}
229

230
func (c *lbClient) Head(ctx context.Context, url string) (resp *http.Response, err error) {
231
	req, err := http.NewRequest(http.MethodHead, url, nil)
232
	if err != nil {
233
		return
234
	}
235
	return c.Do(ctx, req)
236
}
237

238
func (c *lbClient) Get(ctx context.Context, url string) (resp *http.Response, err error) {
239
	req, err := http.NewRequest(http.MethodGet, url, nil)
240
	if err != nil {
241
		return
242
	}
243
	return c.Do(ctx, req)
244
}
245

246
func (c *lbClient) Delete(ctx context.Context, url string) (resp *http.Response, err error) {
247
	req, err := http.NewRequest(http.MethodDelete, url, nil)
248
	if err != nil {
249
		return
250
	}
251
	return c.Do(ctx, req)
252
}
253

254
func (c *lbClient) doCtx(ctx context.Context, r *http.Request) (resp *http.Response, err error) {
255
	reqURI := r.URL.RequestURI()
256
	span := trace.SpanFromContextSafe(ctx)
257
	span.Debug("lb.doCtx: start", reqURI)
258
	var (
259
		hosts    []string
260
		tryTimes = c.requestTryTimes
261
		index    = 0
262
	)
263

264
	for i := 0; i < tryTimes; i++ {
265
		// close failed body
266
		if resp != nil && resp.Body != nil {
267
			resp.Body.Close()
268
			resp = nil
269
		}
270
		select {
271
		case <-ctx.Done():
272
			return nil, ctx.Err()
273
		default:
274
		}
275
		// get the available hosts
276
		if index == len(hosts) || hosts == nil {
277
			hosts = c.sel.GetAvailableHosts()
278
			if len(hosts) < 1 {
279
				err = errNoHost
280
				span.Errorf("lb.doCtx: get host failed: %s", err.Error())
281
				return
282
			}
283
			index = 0
284
		}
285
		host := hosts[index]
286
		// get the real url
287
		r.URL, err = urllib.Parse(host + reqURI)
288
		if err != nil {
289
			span.Errorf("lb.doCtx: parse %s error", host+reqURI)
290
			return
291
		}
292
		r.Host = r.URL.Host
293
		resp, err = c.clientMap[host].Do(ctx, r)
294
		if i == tryTimes-1 {
295
			span.Warnf("lb.doCtx: the last host of request, try times: %d, err: %v, host: %s",
296
				i+1, err, host)
297
			return
298
		}
299
		code := 0
300
		if resp != nil {
301
			code = resp.StatusCode
302
		}
303
		logInfo := fmt.Sprintf("try times: %d, code: %d, err: %v, host: %s", i+1, code, err, host)
304
		if c.cfg.ShouldRetry(code, err) {
305
			span.Info("lb.doCtx: retry host,", logInfo)
306
			index++
307
			c.sel.SetFail(host)
308
			if r.Body == nil {
309
				continue
310
			}
311
			if r.GetBody != nil {
312
				var _err error
313
				r.Body, _err = r.GetBody()
314
				if _err != nil {
315
					span.Warnf("lb.doCtx: retry failed, try times: %d, code: %d, err: %v, host: %s",
316
						i+1, code, _err, host)
317
					return
318
				}
319
				continue
320
			}
321
			span.Warn("lb.doCtx: request not support retry,", logInfo)
322
			return
323
		}
324
		span.Debug("lb.doCtx: the last host of request,", logInfo)
325
		return
326
	}
327
	return
328
}
329

330
func (c *lbClient) Close() {
331
	c.sel.Close()
332
}
333

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.