cubefs

Форк
0
/
metaapi.go 
302 строки · 7.9 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package api
16

17
import (
18
	"bufio"
19
	"bytes"
20
	"encoding/json"
21
	"fmt"
22
	"io"
23
	"net/http"
24
	"strings"
25
	"sync"
26
	"time"
27

28
	"github.com/cubefs/cubefs/metanode"
29
	"github.com/cubefs/cubefs/proto"
30
	"github.com/cubefs/cubefs/util/log"
31
)
32

33
const (
34
	requestTimeout = 30 * time.Second
35
)
36

37
type Inode struct {
38
	sync.RWMutex
39
	Inode      uint64 // Inode ID
40
	Type       uint32
41
	Uid        uint32
42
	Gid        uint32
43
	Size       uint64
44
	Generation uint64
45
	CreateTime int64
46
	AccessTime int64
47
	ModifyTime int64
48
	LinkTarget []byte // SymLink target name
49
	NLink      uint32 // NodeLink counts
50
	Flag       int32
51
	Reserved   uint64 // reserved space
52
	Extents    []proto.ExtentKey
53
}
54

55
// String returns the string format of the inode.
56
func (i *Inode) String() string {
57
	i.RLock()
58
	defer i.RUnlock()
59
	buff := bytes.NewBuffer(nil)
60
	buff.Grow(128)
61
	buff.WriteString("Inode{")
62
	buff.WriteString(fmt.Sprintf("Inode[%d]", i.Inode))
63
	buff.WriteString(fmt.Sprintf("Type[%d]", i.Type))
64
	buff.WriteString(fmt.Sprintf("Uid[%d]", i.Uid))
65
	buff.WriteString(fmt.Sprintf("Gid[%d]", i.Gid))
66
	buff.WriteString(fmt.Sprintf("Size[%d]", i.Size))
67
	buff.WriteString(fmt.Sprintf("Gen[%d]", i.Generation))
68
	buff.WriteString(fmt.Sprintf("CT[%d]", i.CreateTime))
69
	buff.WriteString(fmt.Sprintf("AT[%d]", i.AccessTime))
70
	buff.WriteString(fmt.Sprintf("MT[%d]", i.ModifyTime))
71
	buff.WriteString(fmt.Sprintf("LinkT[%s]", i.LinkTarget))
72
	buff.WriteString(fmt.Sprintf("NLink[%d]", i.NLink))
73
	buff.WriteString(fmt.Sprintf("Flag[%d]", i.Flag))
74
	buff.WriteString(fmt.Sprintf("Reserved[%d]", i.Reserved))
75
	buff.WriteString(fmt.Sprintf("Extents[%s]", i.Extents))
76
	buff.WriteString("}")
77
	return buff.String()
78
}
79

80
type MetaHttpClient struct {
81
	sync.RWMutex
82
	useSSL bool
83
	host   string
84
}
85

86
// NewMasterHelper returns a new MasterClient instance.
87
func NewMetaHttpClient(host string, useSSL bool) *MetaHttpClient {
88
	mc := &MetaHttpClient{host: host, useSSL: useSSL}
89
	return mc
90
}
91

92
type request struct {
93
	method string
94
	path   string
95
	params map[string]string
96
	header map[string]string
97
	body   []byte
98
}
99

100
func newAPIRequest(method string, path string) *request {
101
	return &request{
102
		method: method,
103
		path:   path,
104
		params: make(map[string]string),
105
		header: make(map[string]string),
106
	}
107
}
108

109
type RespBody struct{}
110

111
func (c *MetaHttpClient) serveRequest(r *request) (respData []byte, err error) {
112
	var resp *http.Response
113
	var schema string
114
	if c.useSSL {
115
		schema = "https"
116
	} else {
117
		schema = "http"
118
	}
119
	url := fmt.Sprintf("%s://%s%s", schema, c.host,
120
		r.path)
121
	resp, err = c.httpRequest(r.method, url, r.params, r.header, r.body)
122
	log.LogInfof("resp %v,err %v", resp, err)
123
	if err != nil {
124
		log.LogErrorf("serveRequest: send http request fail: method(%v) url(%v) err(%v)", r.method, url, err)
125
		return
126
	}
127
	stateCode := resp.StatusCode
128
	respData, err = io.ReadAll(resp.Body)
129
	_ = resp.Body.Close()
130
	if err != nil {
131
		log.LogErrorf("serveRequest: read http response body fail: err(%v)", err)
132
		return
133
	}
134
	switch stateCode {
135
	case http.StatusOK:
136
		body := new(proto.HTTPReplyRaw)
137
		if err := body.Unmarshal(respData); err != nil {
138
			return nil, err
139
		}
140
		// o represent proto.ErrCodeSuccess, TODO: 200 ???
141
		if body.Code != 200 {
142
			return nil, proto.ParseErrorCode(body.Code)
143
		}
144
		return body.Bytes(), nil
145
	default:
146
		log.LogErrorf("serveRequest: unknown status: host(%v) uri(%v) status(%v) body(%s).",
147
			resp.Request.URL.String(), c.host, stateCode, strings.Replace(string(respData), "\n", "", -1))
148
	}
149
	return
150
}
151

152
func (c *MetaHttpClient) httpRequest(method, url string, param, header map[string]string, reqData []byte) (resp *http.Response, err error) {
153
	client := http.DefaultClient
154
	reader := bytes.NewReader(reqData)
155
	client.Timeout = requestTimeout
156
	var req *http.Request
157
	fullUrl := c.mergeRequestUrl(url, param)
158
	log.LogDebugf("httpRequest: merge request url: method(%v) url(%v) bodyLength[%v].", method, fullUrl, len(reqData))
159
	if req, err = http.NewRequest(method, fullUrl, reader); err != nil {
160
		return
161
	}
162
	req.Header.Set("Content-Type", "application/json")
163
	req.Header.Set("Connection", "close")
164
	for k, v := range header {
165
		req.Header.Set(k, v)
166
	}
167
	resp, err = client.Do(req)
168
	return
169
}
170

171
func (c *MetaHttpClient) mergeRequestUrl(url string, params map[string]string) string {
172
	if len(params) > 0 {
173
		buff := bytes.NewBuffer([]byte(url))
174
		isFirstParam := true
175
		for k, v := range params {
176
			if isFirstParam {
177
				buff.WriteString("?")
178
				isFirstParam = false
179
			} else {
180
				buff.WriteString("&")
181
			}
182
			buff.WriteString(k)
183
			buff.WriteString("=")
184
			buff.WriteString(v)
185
		}
186
		return buff.String()
187
	}
188
	return url
189
}
190

191
func (mc *MetaHttpClient) GetMetaPartition(pid uint64) (cursor uint64, err error) {
192
	defer func() {
193
		if err != nil {
194
			log.LogErrorf("action[GetMetaPartition],pid:%v,err:%v", pid, err)
195
		}
196
	}()
197
	request := newAPIRequest(http.MethodGet, "/getPartitionById")
198
	request.params["pid"] = fmt.Sprintf("%v", pid)
199
	respData, err := mc.serveRequest(request)
200
	log.LogInfof("err:%v,respData:%v\n", err, string(respData))
201
	if err != nil {
202
		return
203
	}
204
	type RstData struct {
205
		Cursor uint64
206
	}
207
	body := &RstData{}
208
	if err = json.Unmarshal(respData, body); err != nil {
209
		return
210
	}
211
	return body.Cursor, nil
212
}
213

214
func (mc *MetaHttpClient) GetAllDentry(pid uint64) (dentryMap map[string]*metanode.Dentry, err error) {
215
	defer func() {
216
		if err != nil {
217
			log.LogErrorf("action[GetAllDentry],pid:%v,err:%v", pid, err)
218
		}
219
	}()
220
	dentryMap = make(map[string]*metanode.Dentry)
221
	request := newAPIRequest(http.MethodGet, "/getAllDentry")
222
	request.params["pid"] = fmt.Sprintf("%v", pid)
223
	respData, err := mc.serveRequest(request)
224
	log.LogInfof("err:%v,respData:%v\n", err, string(respData))
225
	if err != nil {
226
		return
227
	}
228

229
	dec := json.NewDecoder(bytes.NewBuffer(respData))
230
	dec.UseNumber()
231

232
	// It's the "items". We expect it to be an array
233
	if err = parseToken(dec, '['); err != nil {
234
		return
235
	}
236
	// Read items (large objects)
237
	for dec.More() {
238
		// Read next item (large object)
239
		lo := &metanode.Dentry{}
240
		if err = dec.Decode(lo); err != nil {
241
			return
242
		}
243
		dentryMap[fmt.Sprintf("%v_%v", lo.ParentId, lo.Name)] = lo
244
	}
245
	// Array closing delimiter
246
	if err = parseToken(dec, ']'); err != nil {
247
		return
248
	}
249
	return
250
}
251

252
func parseToken(dec *json.Decoder, expectToken rune) (err error) {
253
	t, err := dec.Token()
254
	if err != nil {
255
		return
256
	}
257
	if delim, ok := t.(json.Delim); !ok || delim != json.Delim(expectToken) {
258
		err = fmt.Errorf("expected token[%v],delim[%v],ok[%v]", string(expectToken), delim, ok)
259
		return
260
	}
261
	return
262
}
263

264
func (mc *MetaHttpClient) GetAllInodes(pid uint64) (rstMap map[uint64]*Inode, err error) {
265
	defer func() {
266
		if err != nil {
267
			log.LogErrorf("action[GetAllInodes],pid:%v,err:%v", pid, err)
268
		}
269
	}()
270
	reqURL := fmt.Sprintf("http://%v%v?pid=%v", mc.host, "/getAllInodes", pid)
271
	log.LogDebugf("reqURL=%v", reqURL)
272
	resp, err := http.Get(reqURL)
273
	if err != nil {
274
		return
275
	}
276
	defer resp.Body.Close()
277
	return unmarshalInodes(resp)
278
}
279

280
func unmarshalInodes(resp *http.Response) (rstMap map[uint64]*Inode, err error) {
281
	bufReader := bufio.NewReader(resp.Body)
282
	rstMap = make(map[uint64]*Inode)
283
	var buf []byte
284
	for {
285
		buf, err = bufReader.ReadBytes('\n')
286
		log.LogInfof("buf[%v],err[%v]", string(buf), err)
287
		if err != nil && err != io.EOF {
288
			return
289
		}
290
		inode := &Inode{}
291
		if err1 := json.Unmarshal(buf, inode); err1 != nil {
292
			err = err1
293
			return
294
		}
295
		rstMap[inode.Inode] = inode
296
		log.LogInfof("after unmarshal current inode[%v]", inode)
297
		if err == io.EOF {
298
			err = nil
299
			return
300
		}
301
	}
302
}
303

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.