cubefs

Форк
0
/
master_helper.go 
216 строк · 5.6 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package util
16

17
import (
18
	"bytes"
19
	"encoding/json"
20
	"errors"
21
	"fmt"
22
	"io"
23
	"net/http"
24
	"strings"
25
	"sync"
26
	"time"
27

28
	"github.com/cubefs/cubefs/util/log"
29
)
30

31
const (
32
	requestTimeout = 30 * time.Second
33
)
34

35
var ErrNoValidMaster = errors.New("no valid master")
36

37
// MasterHelper defines the helper struct to manage the master.
38
type MasterHelper interface {
39
	AddNode(address string)
40
	Nodes() []string
41
	Leader() string
42
	Request(method, path string, param, header map[string]string, body []byte) (data []byte, err error)
43
}
44

45
type masterHelper struct {
46
	sync.RWMutex
47
	masters    []string
48
	leaderAddr string
49
}
50

51
// AddNode add the given address as the master address.
52
func (helper *masterHelper) AddNode(address string) {
53
	helper.Lock()
54
	helper.updateMaster(address)
55
	helper.Unlock()
56
}
57

58
// Leader returns the current leader address.
59
func (helper *masterHelper) Leader() (addr string) {
60
	helper.RLock()
61
	addr = helper.leaderAddr
62
	helper.RUnlock()
63
	return
64
}
65

66
// Change the leader address.
67
func (helper *masterHelper) setLeader(addr string) {
68
	helper.Lock()
69
	helper.leaderAddr = addr
70
	helper.Unlock()
71
}
72

73
// Request sends out the request through the helper.
74
func (helper *masterHelper) Request(method, path string, param, header map[string]string, reqData []byte) (respData []byte, err error) {
75
	respData, err = helper.request(method, path, param, header, reqData)
76
	return
77
}
78

79
func (helper *masterHelper) request(method, path string, param, header map[string]string, reqData []byte) (repsData []byte, err error) {
80
	leaderAddr, nodes := helper.prepareRequest()
81
	host := leaderAddr
82
	for i := -1; i < len(nodes); i++ {
83
		if i == -1 {
84
			if host == "" {
85
				continue
86
			}
87
		} else {
88
			host = nodes[i]
89
		}
90
		var resp *http.Response
91
		resp, err = helper.httpRequest(method, fmt.Sprintf("http://%s%s", host,
92
			path), param, header, reqData)
93
		if err != nil {
94
			log.LogErrorf("[masterHelper] %s", err)
95
			continue
96
		}
97
		stateCode := resp.StatusCode
98
		repsData, err = io.ReadAll(resp.Body)
99
		resp.Body.Close()
100
		if err != nil {
101
			log.LogErrorf("[masterHelper] %s", err)
102
			continue
103
		}
104
		switch stateCode {
105
		case http.StatusForbidden:
106
			curMasterAddr := strings.TrimSpace(string(repsData))
107
			curMasterAddr = strings.Replace(curMasterAddr, "\n", "", -1)
108
			if len(curMasterAddr) == 0 {
109
				log.LogErrorf("[masterHelper] request[%s] response statudCode"+
110
					"[403], respBody is empty", host)
111
				err = ErrNoValidMaster
112
				return
113
			}
114
			repsData, err = helper.request(method, path, param, header, reqData)
115
			return
116
		case http.StatusOK:
117
			if leaderAddr != host {
118
				helper.setLeader(host)
119
			}
120
			body := &struct {
121
				Code int32           `json:"code"`
122
				Msg  string          `json:"msg"`
123
				Data json.RawMessage `json:"data"`
124
			}{}
125
			if err := json.Unmarshal(repsData, body); err != nil {
126
				return nil, fmt.Errorf("unmarshal response body err:%v", err)
127
			}
128
			// o represent proto.ErrCodeSuccess
129
			if body.Code != 0 {
130
				return nil, fmt.Errorf("request error, code[%d], msg[%s]", body.Code, body.Msg)
131
			}
132
			return []byte(body.Data), nil
133
		default:
134
			log.LogErrorf("[masterHelper] master[%v] uri[%v] statusCode[%v] respBody[%v].",
135
				resp.Request.URL.String(), host, stateCode, string(repsData))
136
			continue
137
		}
138
	}
139
	err = ErrNoValidMaster
140
	return
141
}
142

143
// Nodes returns all master addresses.
144
func (helper *masterHelper) Nodes() (nodes []string) {
145
	helper.RLock()
146
	nodes = helper.masters
147
	helper.RUnlock()
148
	return
149
}
150

151
// prepareRequest returns the leader address and all master addresses.
152
func (helper *masterHelper) prepareRequest() (addr string, nodes []string) {
153
	helper.RLock()
154
	addr = helper.leaderAddr
155
	nodes = helper.masters
156
	helper.RUnlock()
157
	return
158
}
159

160
func (helper *masterHelper) httpRequest(method, url string, param, header map[string]string, reqData []byte) (resp *http.Response, err error) {
161
	client := &http.Client{}
162
	reader := bytes.NewReader(reqData)
163
	client.Timeout = requestTimeout
164
	var req *http.Request
165
	fullUrl := helper.mergeRequestUrl(url, param)
166
	log.LogDebugf("action[httpRequest] method[%v] url[%v] reqBodyLen[%v].", method, fullUrl, len(reqData))
167
	if req, err = http.NewRequest(method, fullUrl, reader); err != nil {
168
		return
169
	}
170
	req.Header.Set("Content-Type", "application/json")
171
	req.Header.Set("Connection", "close")
172
	for k, v := range header {
173
		req.Header.Set(k, v)
174
	}
175
	resp, err = client.Do(req)
176
	return
177
}
178

179
func (helper *masterHelper) updateMaster(address string) {
180
	contains := false
181
	for _, master := range helper.masters {
182
		if master == address {
183
			contains = true
184
			break
185
		}
186
	}
187
	if !contains {
188
		helper.masters = append(helper.masters, address)
189
	}
190
	helper.leaderAddr = address
191
}
192

193
func (helper *masterHelper) mergeRequestUrl(url string, params map[string]string) string {
194
	if len(params) > 0 {
195
		buff := bytes.NewBuffer([]byte(url))
196
		isFirstParam := true
197
		for k, v := range params {
198
			if isFirstParam {
199
				buff.WriteString("?")
200
				isFirstParam = false
201
			} else {
202
				buff.WriteString("&")
203
			}
204
			buff.WriteString(k)
205
			buff.WriteString("=")
206
			buff.WriteString(v)
207
		}
208
		return buff.String()
209
	}
210
	return url
211
}
212

213
// NewMasterHelper returns a new MasterHelper instance.
214
func NewMasterHelper() MasterHelper {
215
	return &masterHelper{}
216
}
217

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.