1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
28
"github.com/cubefs/cubefs/metanode"
29
"github.com/cubefs/cubefs/proto"
30
"github.com/cubefs/cubefs/util/log"
34
requestTimeout = 30 * time.Second
39
Inode uint64 // Inode ID
48
LinkTarget []byte // SymLink target name
49
NLink uint32 // NodeLink counts
51
Reserved uint64 // reserved space
52
Extents []proto.ExtentKey
55
// String returns the string format of the inode.
56
func (i *Inode) String() string {
59
buff := bytes.NewBuffer(nil)
61
buff.WriteString("Inode{")
62
buff.WriteString(fmt.Sprintf("Inode[%d]", i.Inode))
63
buff.WriteString(fmt.Sprintf("Type[%d]", i.Type))
64
buff.WriteString(fmt.Sprintf("Uid[%d]", i.Uid))
65
buff.WriteString(fmt.Sprintf("Gid[%d]", i.Gid))
66
buff.WriteString(fmt.Sprintf("Size[%d]", i.Size))
67
buff.WriteString(fmt.Sprintf("Gen[%d]", i.Generation))
68
buff.WriteString(fmt.Sprintf("CT[%d]", i.CreateTime))
69
buff.WriteString(fmt.Sprintf("AT[%d]", i.AccessTime))
70
buff.WriteString(fmt.Sprintf("MT[%d]", i.ModifyTime))
71
buff.WriteString(fmt.Sprintf("LinkT[%s]", i.LinkTarget))
72
buff.WriteString(fmt.Sprintf("NLink[%d]", i.NLink))
73
buff.WriteString(fmt.Sprintf("Flag[%d]", i.Flag))
74
buff.WriteString(fmt.Sprintf("Reserved[%d]", i.Reserved))
75
buff.WriteString(fmt.Sprintf("Extents[%s]", i.Extents))
80
type MetaHttpClient struct {
86
// NewMasterHelper returns a new MasterClient instance.
87
func NewMetaHttpClient(host string, useSSL bool) *MetaHttpClient {
88
mc := &MetaHttpClient{host: host, useSSL: useSSL}
95
params map[string]string
96
header map[string]string
100
func newAPIRequest(method string, path string) *request {
104
params: make(map[string]string),
105
header: make(map[string]string),
109
type RespBody struct{}
111
func (c *MetaHttpClient) serveRequest(r *request) (respData []byte, err error) {
112
var resp *http.Response
119
url := fmt.Sprintf("%s://%s%s", schema, c.host,
121
resp, err = c.httpRequest(r.method, url, r.params, r.header, r.body)
122
log.LogInfof("resp %v,err %v", resp, err)
124
log.LogErrorf("serveRequest: send http request fail: method(%v) url(%v) err(%v)", r.method, url, err)
127
stateCode := resp.StatusCode
128
respData, err = io.ReadAll(resp.Body)
129
_ = resp.Body.Close()
131
log.LogErrorf("serveRequest: read http response body fail: err(%v)", err)
136
body := new(proto.HTTPReplyRaw)
137
if err := body.Unmarshal(respData); err != nil {
140
// o represent proto.ErrCodeSuccess, TODO: 200 ???
141
if body.Code != 200 {
142
return nil, proto.ParseErrorCode(body.Code)
144
return body.Bytes(), nil
146
log.LogErrorf("serveRequest: unknown status: host(%v) uri(%v) status(%v) body(%s).",
147
resp.Request.URL.String(), c.host, stateCode, strings.Replace(string(respData), "\n", "", -1))
152
func (c *MetaHttpClient) httpRequest(method, url string, param, header map[string]string, reqData []byte) (resp *http.Response, err error) {
153
client := http.DefaultClient
154
reader := bytes.NewReader(reqData)
155
client.Timeout = requestTimeout
156
var req *http.Request
157
fullUrl := c.mergeRequestUrl(url, param)
158
log.LogDebugf("httpRequest: merge request url: method(%v) url(%v) bodyLength[%v].", method, fullUrl, len(reqData))
159
if req, err = http.NewRequest(method, fullUrl, reader); err != nil {
162
req.Header.Set("Content-Type", "application/json")
163
req.Header.Set("Connection", "close")
164
for k, v := range header {
167
resp, err = client.Do(req)
171
func (c *MetaHttpClient) mergeRequestUrl(url string, params map[string]string) string {
173
buff := bytes.NewBuffer([]byte(url))
175
for k, v := range params {
177
buff.WriteString("?")
180
buff.WriteString("&")
183
buff.WriteString("=")
191
func (mc *MetaHttpClient) GetMetaPartition(pid uint64) (cursor uint64, err error) {
194
log.LogErrorf("action[GetMetaPartition],pid:%v,err:%v", pid, err)
197
request := newAPIRequest(http.MethodGet, "/getPartitionById")
198
request.params["pid"] = fmt.Sprintf("%v", pid)
199
respData, err := mc.serveRequest(request)
200
log.LogInfof("err:%v,respData:%v\n", err, string(respData))
204
type RstData struct {
208
if err = json.Unmarshal(respData, body); err != nil {
211
return body.Cursor, nil
214
func (mc *MetaHttpClient) GetAllDentry(pid uint64) (dentryMap map[string]*metanode.Dentry, err error) {
217
log.LogErrorf("action[GetAllDentry],pid:%v,err:%v", pid, err)
220
dentryMap = make(map[string]*metanode.Dentry)
221
request := newAPIRequest(http.MethodGet, "/getAllDentry")
222
request.params["pid"] = fmt.Sprintf("%v", pid)
223
respData, err := mc.serveRequest(request)
224
log.LogInfof("err:%v,respData:%v\n", err, string(respData))
229
dec := json.NewDecoder(bytes.NewBuffer(respData))
232
// It's the "items". We expect it to be an array
233
if err = parseToken(dec, '['); err != nil {
236
// Read items (large objects)
238
// Read next item (large object)
239
lo := &metanode.Dentry{}
240
if err = dec.Decode(lo); err != nil {
243
dentryMap[fmt.Sprintf("%v_%v", lo.ParentId, lo.Name)] = lo
245
// Array closing delimiter
246
if err = parseToken(dec, ']'); err != nil {
252
func parseToken(dec *json.Decoder, expectToken rune) (err error) {
253
t, err := dec.Token()
257
if delim, ok := t.(json.Delim); !ok || delim != json.Delim(expectToken) {
258
err = fmt.Errorf("expected token[%v],delim[%v],ok[%v]", string(expectToken), delim, ok)
264
func (mc *MetaHttpClient) GetAllInodes(pid uint64) (rstMap map[uint64]*Inode, err error) {
267
log.LogErrorf("action[GetAllInodes],pid:%v,err:%v", pid, err)
270
reqURL := fmt.Sprintf("http://%v%v?pid=%v", mc.host, "/getAllInodes", pid)
271
log.LogDebugf("reqURL=%v", reqURL)
272
resp, err := http.Get(reqURL)
276
defer resp.Body.Close()
277
return unmarshalInodes(resp)
280
func unmarshalInodes(resp *http.Response) (rstMap map[uint64]*Inode, err error) {
281
bufReader := bufio.NewReader(resp.Body)
282
rstMap = make(map[uint64]*Inode)
285
buf, err = bufReader.ReadBytes('\n')
286
log.LogInfof("buf[%v],err[%v]", string(buf), err)
287
if err != nil && err != io.EOF {
291
if err1 := json.Unmarshal(buf, inode); err1 != nil {
295
rstMap[inode.Inode] = inode
296
log.LogInfof("after unmarshal current inode[%v]", inode)