1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
28
"github.com/cubefs/cubefs/util/log"
32
requestTimeout = 30 * time.Second
35
var ErrNoValidMaster = errors.New("no valid master")
37
// MasterHelper defines the helper struct to manage the master.
38
type MasterHelper interface {
39
AddNode(address string)
42
Request(method, path string, param, header map[string]string, body []byte) (data []byte, err error)
45
type masterHelper struct {
51
// AddNode add the given address as the master address.
52
func (helper *masterHelper) AddNode(address string) {
54
helper.updateMaster(address)
58
// Leader returns the current leader address.
59
func (helper *masterHelper) Leader() (addr string) {
61
addr = helper.leaderAddr
66
// Change the leader address.
67
func (helper *masterHelper) setLeader(addr string) {
69
helper.leaderAddr = addr
73
// Request sends out the request through the helper.
74
func (helper *masterHelper) Request(method, path string, param, header map[string]string, reqData []byte) (respData []byte, err error) {
75
respData, err = helper.request(method, path, param, header, reqData)
79
func (helper *masterHelper) request(method, path string, param, header map[string]string, reqData []byte) (repsData []byte, err error) {
80
leaderAddr, nodes := helper.prepareRequest()
82
for i := -1; i < len(nodes); i++ {
90
var resp *http.Response
91
resp, err = helper.httpRequest(method, fmt.Sprintf("http://%s%s", host,
92
path), param, header, reqData)
94
log.LogErrorf("[masterHelper] %s", err)
97
stateCode := resp.StatusCode
98
repsData, err = io.ReadAll(resp.Body)
101
log.LogErrorf("[masterHelper] %s", err)
105
case http.StatusForbidden:
106
curMasterAddr := strings.TrimSpace(string(repsData))
107
curMasterAddr = strings.Replace(curMasterAddr, "\n", "", -1)
108
if len(curMasterAddr) == 0 {
109
log.LogErrorf("[masterHelper] request[%s] response statudCode"+
110
"[403], respBody is empty", host)
111
err = ErrNoValidMaster
114
repsData, err = helper.request(method, path, param, header, reqData)
117
if leaderAddr != host {
118
helper.setLeader(host)
121
Code int32 `json:"code"`
122
Msg string `json:"msg"`
123
Data json.RawMessage `json:"data"`
125
if err := json.Unmarshal(repsData, body); err != nil {
126
return nil, fmt.Errorf("unmarshal response body err:%v", err)
128
// o represent proto.ErrCodeSuccess
130
return nil, fmt.Errorf("request error, code[%d], msg[%s]", body.Code, body.Msg)
132
return []byte(body.Data), nil
134
log.LogErrorf("[masterHelper] master[%v] uri[%v] statusCode[%v] respBody[%v].",
135
resp.Request.URL.String(), host, stateCode, string(repsData))
139
err = ErrNoValidMaster
143
// Nodes returns all master addresses.
144
func (helper *masterHelper) Nodes() (nodes []string) {
146
nodes = helper.masters
151
// prepareRequest returns the leader address and all master addresses.
152
func (helper *masterHelper) prepareRequest() (addr string, nodes []string) {
154
addr = helper.leaderAddr
155
nodes = helper.masters
160
func (helper *masterHelper) httpRequest(method, url string, param, header map[string]string, reqData []byte) (resp *http.Response, err error) {
161
client := &http.Client{}
162
reader := bytes.NewReader(reqData)
163
client.Timeout = requestTimeout
164
var req *http.Request
165
fullUrl := helper.mergeRequestUrl(url, param)
166
log.LogDebugf("action[httpRequest] method[%v] url[%v] reqBodyLen[%v].", method, fullUrl, len(reqData))
167
if req, err = http.NewRequest(method, fullUrl, reader); err != nil {
170
req.Header.Set("Content-Type", "application/json")
171
req.Header.Set("Connection", "close")
172
for k, v := range header {
175
resp, err = client.Do(req)
179
func (helper *masterHelper) updateMaster(address string) {
181
for _, master := range helper.masters {
182
if master == address {
188
helper.masters = append(helper.masters, address)
190
helper.leaderAddr = address
193
func (helper *masterHelper) mergeRequestUrl(url string, params map[string]string) string {
195
buff := bytes.NewBuffer([]byte(url))
197
for k, v := range params {
199
buff.WriteString("?")
202
buff.WriteString("&")
205
buff.WriteString("=")
213
// NewMasterHelper returns a new MasterHelper instance.
214
func NewMasterHelper() MasterHelper {
215
return &masterHelper{}