28
"github.com/cubefs/cubefs/proto"
29
"github.com/cubefs/cubefs/util/config"
30
"github.com/cubefs/cubefs/util/errors"
31
"github.com/cubefs/cubefs/util/log"
35
type APIResponse struct {
36
Code int `json:"code"`
37
Msg string `json:"msg"`
38
Data interface{} `json:"data,omitempty"`
42
func NewAPIResponse(code int, msg string) *APIResponse {
50
func (api *APIResponse) Marshal() ([]byte, error) {
51
return json.Marshal(api)
55
func (m *MetaNode) registerAPIHandler() (err error) {
56
http.HandleFunc("/getPartitions", m.getPartitionsHandler)
57
http.HandleFunc("/getPartitionById", m.getPartitionByIDHandler)
58
http.HandleFunc("/getLeaderPartitions", m.getLeaderPartitionsHandler)
59
http.HandleFunc("/getInode", m.getInodeHandler)
60
http.HandleFunc("/getSplitKey", m.getSplitKeyHandler)
61
http.HandleFunc("/getExtentsByInode", m.getExtentsByInodeHandler)
62
http.HandleFunc("/getEbsExtentsByInode", m.getEbsExtentsByInodeHandler)
64
http.HandleFunc("/getAllInodes", m.getAllInodesHandler)
66
http.HandleFunc("/getDentry", m.getDentryHandler)
67
http.HandleFunc("/getDirectory", m.getDirectoryHandler)
68
http.HandleFunc("/getAllDentry", m.getAllDentriesHandler)
69
http.HandleFunc("/getAllTxInfo", m.getAllTxHandler)
70
http.HandleFunc("/getParams", m.getParamsHandler)
71
http.HandleFunc("/getSmuxStat", m.getSmuxStatHandler)
72
http.HandleFunc("/getRaftStatus", m.getRaftStatusHandler)
73
http.HandleFunc("/genClusterVersionFile", m.genClusterVersionFileHandler)
74
http.HandleFunc("/getInodeSnapshot", m.getInodeSnapshotHandler)
75
http.HandleFunc("/getDentrySnapshot", m.getDentrySnapshotHandler)
77
http.HandleFunc("/getTx", m.getTxHandler)
81
func (m *MetaNode) getParamsHandler(w http.ResponseWriter,
83
resp := NewAPIResponse(http.StatusOK, http.StatusText(http.StatusOK))
84
params := make(map[string]interface{})
85
params[metaNodeDeleteBatchCountKey] = DeleteBatchCount()
87
data, _ := resp.Marshal()
88
if _, err := w.Write(data); err != nil {
89
log.LogErrorf("[getPartitionsHandler] response %s", err)
93
func (m *MetaNode) getSmuxStatHandler(w http.ResponseWriter,
95
resp := NewAPIResponse(http.StatusOK, http.StatusText(http.StatusOK))
96
resp.Data = smuxPool.GetStat()
97
data, _ := resp.Marshal()
98
if _, err := w.Write(data); err != nil {
99
log.LogErrorf("[getSmuxStatHandler] response %s", err)
103
func (m *MetaNode) getPartitionsHandler(w http.ResponseWriter,
105
resp := NewAPIResponse(http.StatusOK, http.StatusText(http.StatusOK))
106
resp.Data = m.metadataManager
107
data, _ := resp.Marshal()
108
if _, err := w.Write(data); err != nil {
109
log.LogErrorf("[getPartitionsHandler] response %s", err)
113
func (m *MetaNode) getPartitionByIDHandler(w http.ResponseWriter, r *http.Request) {
115
resp := NewAPIResponse(http.StatusBadRequest, "")
117
data, _ := resp.Marshal()
118
if _, err := w.Write(data); err != nil {
119
log.LogErrorf("[getPartitionByIDHandler] response %s", err)
122
pid, err := strconv.ParseUint(r.FormValue("pid"), 10, 64)
124
resp.Msg = err.Error()
127
mp, err := m.metadataManager.GetPartition(pid)
129
resp.Code = http.StatusNotFound
130
resp.Msg = err.Error()
133
msg := make(map[string]interface{})
134
leader, _ := mp.IsLeader()
135
_, leaderTerm := mp.LeaderTerm()
136
msg["leaderAddr"] = leader
137
msg["leader_term"] = leaderTerm
138
conf := mp.GetBaseConfig()
139
msg["partition_id"] = conf.PartitionId
140
msg["partition_type"] = conf.PartitionType
141
msg["vol_name"] = conf.VolName
142
msg["start"] = conf.Start
143
msg["end"] = conf.End
144
msg["peers"] = conf.Peers
145
msg["nodeId"] = conf.NodeId
146
msg["cursor"] = conf.Cursor
148
resp.Code = http.StatusOK
149
resp.Msg = http.StatusText(http.StatusOK)
152
func (m *MetaNode) getLeaderPartitionsHandler(w http.ResponseWriter, r *http.Request) {
153
resp := NewAPIResponse(http.StatusOK, http.StatusText(http.StatusOK))
154
mps := m.metadataManager.GetLeaderPartitions()
156
data, err := resp.Marshal()
158
log.LogErrorf("json marshal error:%v", err)
159
resp.Code = http.StatusInternalServerError
160
resp.Msg = err.Error()
163
if _, err := w.Write(data); err != nil {
164
log.LogErrorf("[getPartitionsHandler] response %s", err)
165
resp.Code = http.StatusInternalServerError
166
resp.Msg = err.Error()
170
func (m *MetaNode) getAllInodesHandler(w http.ResponseWriter, r *http.Request) {
175
msg := fmt.Sprintf("[getAllInodesHandler] err(%v)", err)
176
if _, e := w.Write([]byte(msg)); e != nil {
177
log.LogErrorf("[getAllInodesHandler] failed to write response: err(%v) msg(%v)", e, msg)
182
if err = r.ParseForm(); err != nil {
185
id, err := strconv.ParseUint(r.FormValue("pid"), 10, 64)
189
mp, err := m.metadataManager.GetPartition(id)
193
verSeq, err := m.getRealVerSeq(w, r)
199
f := func(i BtreeItem) bool {
206
if _, e = w.Write([]byte("\n")); e != nil {
207
log.LogErrorf("[getAllInodesHandler] failed to write response: %v", e)
212
inode, _ = i.(*Inode).getInoByVer(verSeq, false)
216
if data, e = inode.MarshalToJSON(); e != nil {
217
log.LogErrorf("[getAllInodesHandler] failed to marshal to json: %v", e)
221
if _, e = w.Write(data); e != nil {
222
log.LogErrorf("[getAllInodesHandler] failed to write response: %v", e)
229
mp.GetInodeTree().Ascend(f)
232
func (m *MetaNode) getSplitKeyHandler(w http.ResponseWriter, r *http.Request) {
234
log.LogDebugf("getSplitKeyHandler")
235
resp := NewAPIResponse(http.StatusBadRequest, "")
237
data, _ := resp.Marshal()
238
if _, err := w.Write(data); err != nil {
239
log.LogErrorf("[getSplitKeyHandler] response %s", err)
242
pid, err := strconv.ParseUint(r.FormValue("pid"), 10, 64)
244
resp.Msg = err.Error()
247
log.LogDebugf("getSplitKeyHandler")
248
id, err := strconv.ParseUint(r.FormValue("ino"), 10, 64)
250
resp.Msg = err.Error()
253
log.LogDebugf("getSplitKeyHandler")
254
verSeq, err := m.getRealVerSeq(w, r)
256
resp.Msg = err.Error()
259
log.LogDebugf("getSplitKeyHandler")
260
verAll, _ := strconv.ParseBool(r.FormValue("verAll"))
261
mp, err := m.metadataManager.GetPartition(pid)
263
resp.Code = http.StatusNotFound
264
resp.Msg = err.Error()
267
log.LogDebugf("getSplitKeyHandler")
268
req := &InodeGetSplitReq{
274
log.LogDebugf("getSplitKeyHandler")
276
err = mp.InodeGetSplitEk(req, p)
278
resp.Code = http.StatusInternalServerError
279
resp.Msg = err.Error()
282
log.LogDebugf("getSplitKeyHandler")
283
resp.Code = http.StatusSeeOther
284
resp.Msg = p.GetResultMsg()
286
resp.Data = json.RawMessage(p.Data)
287
log.LogDebugf("getSplitKeyHandler data %v", resp.Data)
289
log.LogDebugf("getSplitKeyHandler")
294
func (m *MetaNode) getInodeHandler(w http.ResponseWriter, r *http.Request) {
296
resp := NewAPIResponse(http.StatusBadRequest, "")
298
data, _ := resp.Marshal()
299
if _, err := w.Write(data); err != nil {
300
log.LogErrorf("[getInodeHandler] response %s", err)
303
pid, err := strconv.ParseUint(r.FormValue("pid"), 10, 64)
305
resp.Msg = err.Error()
308
id, err := strconv.ParseUint(r.FormValue("ino"), 10, 64)
310
resp.Msg = err.Error()
314
verSeq, err := m.getRealVerSeq(w, r)
316
resp.Msg = err.Error()
320
verAll, _ := strconv.ParseBool(r.FormValue("verAll"))
322
mp, err := m.metadataManager.GetPartition(pid)
324
resp.Code = http.StatusNotFound
325
resp.Msg = err.Error()
335
err = mp.InodeGet(req, p)
337
resp.Code = http.StatusInternalServerError
338
resp.Msg = err.Error()
341
resp.Code = http.StatusSeeOther
342
resp.Msg = p.GetResultMsg()
344
resp.Data = json.RawMessage(p.Data)
349
func (m *MetaNode) getRaftStatusHandler(w http.ResponseWriter, r *http.Request) {
354
resp := NewAPIResponse(http.StatusOK, http.StatusText(http.StatusOK))
356
data, _ := resp.Marshal()
357
if _, err := w.Write(data); err != nil {
358
log.LogErrorf("[getRaftStatusHandler] response %s", err)
362
raftID, err := strconv.ParseUint(r.FormValue(paramRaftID), 10, 64)
364
err = fmt.Errorf("parse param %v fail: %v", paramRaftID, err)
365
resp.Msg = err.Error()
366
resp.Code = http.StatusBadRequest
370
raftStatus := m.raftStore.RaftStatus(raftID)
371
resp.Data = raftStatus
374
func (m *MetaNode) getEbsExtentsByInodeHandler(w http.ResponseWriter,
377
resp := NewAPIResponse(http.StatusBadRequest, "")
379
data, _ := resp.Marshal()
380
if _, err := w.Write(data); err != nil {
381
log.LogErrorf("[getEbsExtentsByInodeHandler] response %s", err)
384
pid, err := strconv.ParseUint(r.FormValue("pid"), 10, 64)
386
resp.Msg = err.Error()
389
id, err := strconv.ParseUint(r.FormValue("ino"), 10, 64)
391
resp.Msg = err.Error()
394
mp, err := m.metadataManager.GetPartition(pid)
396
resp.Code = http.StatusNotFound
397
resp.Msg = err.Error()
400
req := &proto.GetExtentsRequest{
405
if err = mp.ObjExtentsList(req, p); err != nil {
406
resp.Code = http.StatusInternalServerError
407
resp.Msg = err.Error()
410
resp.Code = http.StatusSeeOther
411
resp.Msg = p.GetResultMsg()
413
resp.Data = json.RawMessage(p.Data)
418
func (m *MetaNode) getExtentsByInodeHandler(w http.ResponseWriter,
421
resp := NewAPIResponse(http.StatusBadRequest, "")
423
data, _ := resp.Marshal()
424
if _, err := w.Write(data); err != nil {
425
log.LogErrorf("[getExtentsByInodeHandler] response %s", err)
428
pid, err := strconv.ParseUint(r.FormValue("pid"), 10, 64)
430
resp.Msg = err.Error()
433
id, err := strconv.ParseUint(r.FormValue("ino"), 10, 64)
435
resp.Msg = err.Error()
439
verSeq, err := m.getRealVerSeq(w, r)
441
resp.Msg = err.Error()
444
verAll, _ := strconv.ParseBool(r.FormValue("verAll"))
445
mp, err := m.metadataManager.GetPartition(pid)
447
resp.Code = http.StatusNotFound
448
resp.Msg = err.Error()
452
req := &proto.GetExtentsRequest{
455
VerSeq: uint64(verSeq),
459
if err = mp.ExtentsList(req, p); err != nil {
460
resp.Code = http.StatusInternalServerError
461
resp.Msg = err.Error()
464
resp.Code = http.StatusSeeOther
465
resp.Msg = p.GetResultMsg()
467
resp.Data = json.RawMessage(p.Data)
472
func (m *MetaNode) getDentryHandler(w http.ResponseWriter, r *http.Request) {
474
name := r.FormValue("name")
475
resp := NewAPIResponse(http.StatusBadRequest, "")
477
data, _ := resp.Marshal()
478
if _, err := w.Write(data); err != nil {
479
log.LogErrorf("[getDentryHandler] response %s", err)
487
if pid, err = strconv.ParseUint(r.FormValue("pid"), 10, 64); err == nil {
488
pIno, err = strconv.ParseUint(r.FormValue("parentIno"), 10, 64)
491
resp.Msg = err.Error()
495
verSeq, err := m.getRealVerSeq(w, r)
497
resp.Msg = err.Error()
500
verAll, _ := strconv.ParseBool(r.FormValue("verAll"))
502
mp, err := m.metadataManager.GetPartition(pid)
504
resp.Code = http.StatusNotFound
505
resp.Msg = err.Error()
516
if err = mp.Lookup(req, p); err != nil {
517
resp.Code = http.StatusSeeOther
518
resp.Msg = err.Error()
522
resp.Code = http.StatusSeeOther
523
resp.Msg = p.GetResultMsg()
525
resp.Data = json.RawMessage(p.Data)
530
func (m *MetaNode) getTxHandler(w http.ResponseWriter, r *http.Request) {
532
resp := NewAPIResponse(http.StatusBadRequest, "")
534
data, _ := resp.Marshal()
535
if _, err := w.Write(data); err != nil {
536
log.LogErrorf("[getTxHandler] response %s", err)
544
if pid, err = strconv.ParseUint(r.FormValue("pid"), 10, 64); err == nil {
545
if txId = r.FormValue("txId"); txId == "" {
546
err = fmt.Errorf("no txId")
550
resp.Msg = err.Error()
554
mp, err := m.metadataManager.GetPartition(pid)
556
resp.Code = http.StatusNotFound
557
resp.Msg = err.Error()
560
req := &proto.TxGetInfoRequest{
565
if err = mp.TxGetInfo(req, p); err != nil {
566
resp.Code = http.StatusSeeOther
567
resp.Msg = err.Error()
571
resp.Code = http.StatusSeeOther
572
resp.Msg = p.GetResultMsg()
574
resp.Data = json.RawMessage(p.Data)
579
func (m *MetaNode) getRealVerSeq(w http.ResponseWriter, r *http.Request) (verSeq uint64, err error) {
580
if r.FormValue("verSeq") != "" {
582
if ver, err = strconv.ParseInt(r.FormValue("verSeq"), 10, 64); err != nil {
587
verSeq = math.MaxUint64
593
func (m *MetaNode) getAllDentriesHandler(w http.ResponseWriter, r *http.Request) {
595
resp := NewAPIResponse(http.StatusSeeOther, "")
599
data, _ := resp.Marshal()
600
if _, err := w.Write(data); err != nil {
601
log.LogErrorf("[getAllDentriesHandler] response %s", err)
605
pid, err := strconv.ParseUint(r.FormValue("pid"), 10, 64)
607
resp.Code = http.StatusBadRequest
608
resp.Msg = err.Error()
611
mp, err := m.metadataManager.GetPartition(pid)
613
resp.Code = http.StatusNotFound
614
resp.Msg = err.Error()
618
verSeq, err := m.getRealVerSeq(w, r)
620
resp.Msg = err.Error()
624
buff := bytes.NewBufferString(`{"code": 200, "msg": "OK", "data":[`)
625
if _, err := w.Write(buff.Bytes()); err != nil {
631
delimiter = []byte{',', '\n'}
635
mp.GetDentryTree().Ascend(func(i BtreeItem) bool {
636
den, _ := i.(*Dentry).getDentryFromVerList(verSeq, false)
637
if den == nil || den.isDeleted() {
642
if _, err = w.Write(delimiter); err != nil {
648
val, err = json.Marshal(den)
650
w.WriteHeader(http.StatusInternalServerError)
651
w.Write([]byte(err.Error()))
654
if _, err = w.Write(val); err != nil {
660
buff.WriteString(`]}`)
661
if _, err = w.Write(buff.Bytes()); err != nil {
662
log.LogErrorf("[getAllDentriesHandler] response %s", err)
667
func (m *MetaNode) getAllTxHandler(w http.ResponseWriter, r *http.Request) {
669
resp := NewAPIResponse(http.StatusOK, "")
673
data, _ := resp.Marshal()
674
if _, err := w.Write(data); err != nil {
675
log.LogErrorf("[getAllTxHandler] response %s", err)
679
pid, err := strconv.ParseUint(r.FormValue("pid"), 10, 64)
681
resp.Code = http.StatusBadRequest
682
resp.Msg = err.Error()
685
mp, err := m.metadataManager.GetPartition(pid)
687
resp.Code = http.StatusNotFound
688
resp.Msg = err.Error()
691
buff := bytes.NewBufferString(`{"code": 200, "msg": "OK", "data":[`)
692
if _, err := w.Write(buff.Bytes()); err != nil {
698
delimiter = []byte{',', '\n'}
702
f := func(i BtreeItem) bool {
704
if _, err = w.Write(delimiter); err != nil {
711
if ino, ok := i.(*TxRollbackInode); ok {
712
_, err = w.Write([]byte(ino.ToString()))
718
if den, ok := i.(*TxRollbackDentry); ok {
719
_, err = w.Write([]byte(den.ToString()))
726
val, err = json.Marshal(i)
728
w.WriteHeader(http.StatusInternalServerError)
729
w.Write([]byte(err.Error()))
732
if _, err = w.Write(val); err != nil {
738
txTree, rbInoTree, rbDenTree := mp.TxGetTree()
744
buff.WriteString(`]}`)
745
if _, err = w.Write(buff.Bytes()); err != nil {
746
log.LogErrorf("[getAllTxHandler] response %s", err)
751
func (m *MetaNode) getDirectoryHandler(w http.ResponseWriter, r *http.Request) {
752
resp := NewAPIResponse(http.StatusBadRequest, "")
754
data, _ := resp.Marshal()
755
if _, err := w.Write(data); err != nil {
756
log.LogErrorf("[getDirectoryHandler] response %s", err)
759
pid, err := strconv.ParseUint(r.FormValue("pid"), 10, 64)
761
resp.Msg = err.Error()
765
pIno, err := strconv.ParseUint(r.FormValue("parentIno"), 10, 64)
767
resp.Msg = err.Error()
771
verSeq, err := m.getRealVerSeq(w, r)
773
resp.Msg = err.Error()
777
mp, err := m.metadataManager.GetPartition(pid)
779
resp.Code = http.StatusNotFound
780
resp.Msg = err.Error()
788
if err = mp.ReadDir(&req, p); err != nil {
789
resp.Code = http.StatusInternalServerError
790
resp.Msg = err.Error()
793
resp.Code = http.StatusSeeOther
794
resp.Msg = p.GetResultMsg()
796
resp.Data = json.RawMessage(p.Data)
801
func (m *MetaNode) genClusterVersionFileHandler(w http.ResponseWriter, r *http.Request) {
803
resp := NewAPIResponse(http.StatusOK, "Generate cluster version file success")
805
data, _ := resp.Marshal()
806
if _, err := w.Write(data); err != nil {
807
log.LogErrorf("[genClusterVersionFileHandler] response %s", err)
810
paths := make([]string, 0)
811
paths = append(paths, m.metadataDir, m.raftDir)
812
for _, p := range paths {
813
if _, err := os.Stat(path.Join(p, config.ClusterVersionFile)); err == nil || os.IsExist(err) {
814
resp.Code = http.StatusCreated
815
resp.Msg = "Cluster version file already exists in " + p
819
for _, p := range paths {
820
if err := config.CheckOrStoreClusterUuid(p, m.clusterUuid, true); err != nil {
821
resp.Code = http.StatusInternalServerError
822
resp.Msg = "Failed to create cluster version file in " + p
829
func (m *MetaNode) getInodeSnapshotHandler(w http.ResponseWriter, r *http.Request) {
830
m.getSnapshotHandler(w, r, inodeFile)
833
func (m *MetaNode) getDentrySnapshotHandler(w http.ResponseWriter, r *http.Request) {
834
m.getSnapshotHandler(w, r, dentryFile)
837
func (m *MetaNode) getSnapshotHandler(w http.ResponseWriter, r *http.Request, file string) {
841
msg := fmt.Sprintf("[getInodeSnapshotHandler] err(%v)", err)
842
log.LogErrorf("%s", msg)
843
if _, e := w.Write([]byte(msg)); e != nil {
844
log.LogErrorf("[getInodeSnapshotHandler] failed to write response: err(%v) msg(%v)", e, msg)
848
if err = r.ParseForm(); err != nil {
851
id, err := strconv.ParseUint(r.FormValue("pid"), 10, 64)
855
mp, err := m.metadataManager.GetPartition(id)
860
filename := path.Join(mp.GetBaseConfig().RootDir, snapshotDir, file)
861
if _, err = os.Stat(filename); err != nil {
862
err = errors.NewErrorf("[getInodeSnapshotHandler] Stat: %s", err.Error())
865
fp, err := os.OpenFile(filename, os.O_RDONLY, 0o644)
867
err = errors.NewErrorf("[getInodeSnapshotHandler] OpenFile: %s", err.Error())
872
_, err = io.Copy(w, fp)
874
err = errors.NewErrorf("[getInodeSnapshotHandler] copy: %s", err.Error())