1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
26
"github.com/cubefs/cubefs/depends/tiglabs/raft"
27
"github.com/cubefs/cubefs/proto"
28
"github.com/cubefs/cubefs/storage"
29
"github.com/cubefs/cubefs/util/config"
30
"github.com/cubefs/cubefs/util/log"
33
var AutoRepairStatus = true
35
func (s *DataNode) getDiskAPI(w http.ResponseWriter, r *http.Request) {
36
disks := make([]interface{}, 0)
37
for _, diskItem := range s.space.GetDisks() {
39
Path string `json:"path"`
40
Total uint64 `json:"total"`
41
Used uint64 `json:"used"`
42
Available uint64 `json:"available"`
43
Unallocated uint64 `json:"unallocated"`
44
Allocated uint64 `json:"allocated"`
45
Status int `json:"status"`
46
RestSize uint64 `json:"restSize"`
47
DiskRdoSize uint64 `json:"diskRdoSize"`
48
Partitions int `json:"partitions"`
49
Decommission bool `json:"decommission"`
52
Total: diskItem.Total,
54
Available: diskItem.Available,
55
Unallocated: diskItem.Unallocated,
56
Allocated: diskItem.Allocated,
57
Status: diskItem.Status,
58
RestSize: diskItem.ReservedSpace,
59
DiskRdoSize: diskItem.DiskRdonlySpace,
60
Partitions: diskItem.PartitionCount(),
61
Decommission: diskItem.GetDecommissionStatus(),
63
disks = append(disks, disk)
65
diskReport := &struct {
66
Disks []interface{} `json:"disks"`
67
Zone string `json:"zone"`
72
s.buildSuccessResp(w, diskReport)
75
func (s *DataNode) getStatAPI(w http.ResponseWriter, r *http.Request) {
76
response := &proto.DataNodeHeartbeatResponse{}
77
s.buildHeartBeatResponse(response)
79
s.buildSuccessResp(w, response)
82
func (s *DataNode) setAutoRepairStatus(w http.ResponseWriter, r *http.Request) {
84
paramAutoRepair = "autoRepair"
86
if err := r.ParseForm(); err != nil {
87
err = fmt.Errorf("parse form fail: %v", err)
88
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
91
autoRepair, err := strconv.ParseBool(r.FormValue(paramAutoRepair))
93
err = fmt.Errorf("parse param %v fail: %v", paramAutoRepair, err)
94
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
97
AutoRepairStatus = autoRepair
98
s.buildSuccessResp(w, autoRepair)
101
func (s *DataNode) getRaftStatus(w http.ResponseWriter, r *http.Request) {
103
paramRaftID = "raftID"
105
if err := r.ParseForm(); err != nil {
106
err = fmt.Errorf("parse form fail: %v", err)
107
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
110
raftID, err := strconv.ParseUint(r.FormValue(paramRaftID), 10, 64)
112
err = fmt.Errorf("parse param %v fail: %v", paramRaftID, err)
113
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
116
raftStatus := s.raftStore.RaftStatus(raftID)
117
s.buildSuccessResp(w, raftStatus)
120
func (s *DataNode) getPartitionsAPI(w http.ResponseWriter, r *http.Request) {
121
partitions := make([]interface{}, 0)
122
s.space.RangePartitions(func(dp *DataPartition) bool {
123
partition := &struct {
124
ID uint64 `json:"id"`
125
Size int `json:"size"`
126
Used int `json:"used"`
127
Status int `json:"status"`
128
Path string `json:"path"`
129
Replicas []string `json:"replicas"`
136
Replicas: dp.Replicas(),
138
partitions = append(partitions, partition)
142
Partitions []interface{} `json:"partitions"`
143
PartitionCount int `json:"partitionCount"`
145
Partitions: partitions,
146
PartitionCount: len(partitions),
148
s.buildSuccessResp(w, result)
151
func (s *DataNode) getPartitionAPI(w http.ResponseWriter, r *http.Request) {
153
paramPartitionID = "id"
157
files []*storage.ExtentInfo
159
tinyDeleteRecordSize int64
162
if err = r.ParseForm(); err != nil {
163
err = fmt.Errorf("parse form fail: %v", err)
164
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
167
if partitionID, err = strconv.ParseUint(r.FormValue(paramPartitionID), 10, 64); err != nil {
168
err = fmt.Errorf("parse param %v fail: %v", paramPartitionID, err)
169
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
172
partition := s.space.Partition(partitionID)
173
if partition == nil {
174
s.buildFailureResp(w, http.StatusNotFound, "partition not exist")
177
if files, tinyDeleteRecordSize, err = partition.ExtentStore().GetAllWatermarks(nil); err != nil {
178
err = fmt.Errorf("get watermark fail: %v", err)
179
s.buildFailureResp(w, http.StatusInternalServerError, err.Error())
183
if partition.IsDataPartitionLoading() {
184
raftSt = &raft.Status{Stopped: true}
186
raftSt = partition.raftPartition.Status()
190
VolName string `json:"volName"`
191
ID uint64 `json:"id"`
192
Size int `json:"size"`
193
Used int `json:"used"`
194
Status int `json:"status"`
195
Path string `json:"path"`
196
Files []*storage.ExtentInfo `json:"extents"`
197
FileCount int `json:"fileCount"`
198
Replicas []string `json:"replicas"`
199
TinyDeleteRecordSize int64 `json:"tinyDeleteRecordSize"`
200
RaftStatus *raft.Status `json:"raftStatus"`
202
VolName: partition.volumeID,
203
ID: partition.partitionID,
204
Size: partition.Size(),
205
Used: partition.Used(),
206
Status: partition.Status(),
207
Path: partition.Path(),
209
FileCount: len(files),
210
Replicas: partition.Replicas(),
211
TinyDeleteRecordSize: tinyDeleteRecordSize,
215
if partition.isNormalType() {
216
result.RaftStatus = partition.raftPartition.Status()
219
s.buildSuccessResp(w, result)
222
func (s *DataNode) getExtentAPI(w http.ResponseWriter, r *http.Request) {
227
extentInfo *storage.ExtentInfo
229
if err = r.ParseForm(); err != nil {
230
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
233
if partitionID, err = strconv.ParseUint(r.FormValue("partitionID"), 10, 64); err != nil {
234
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
237
if extentID, err = strconv.Atoi(r.FormValue("extentID")); err != nil {
238
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
241
partition := s.space.Partition(partitionID)
242
if partition == nil {
243
s.buildFailureResp(w, http.StatusNotFound, "partition not exist")
246
if extentInfo, err = partition.ExtentStore().Watermark(uint64(extentID)); err != nil {
247
s.buildFailureResp(w, 500, err.Error())
251
s.buildSuccessResp(w, extentInfo)
254
func (s *DataNode) getBlockCrcAPI(w http.ResponseWriter, r *http.Request) {
259
blocks []*storage.BlockCrc
261
if err = r.ParseForm(); err != nil {
262
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
265
if partitionID, err = strconv.ParseUint(r.FormValue("partitionID"), 10, 64); err != nil {
266
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
269
if extentID, err = strconv.Atoi(r.FormValue("extentID")); err != nil {
270
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
273
partition := s.space.Partition(partitionID)
274
if partition == nil {
275
s.buildFailureResp(w, http.StatusNotFound, "partition not exist")
278
if blocks, err = partition.ExtentStore().ScanBlocks(uint64(extentID)); err != nil {
279
s.buildFailureResp(w, 500, err.Error())
283
s.buildSuccessResp(w, blocks)
286
func (s *DataNode) getTinyDeleted(w http.ResponseWriter, r *http.Request) {
290
extentInfo []storage.ExtentDeleted
292
if err = r.ParseForm(); err != nil {
293
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
296
if partitionID, err = strconv.ParseUint(r.FormValue("id"), 10, 64); err != nil {
297
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
300
partition := s.space.Partition(partitionID)
301
if partition == nil {
302
s.buildFailureResp(w, http.StatusNotFound, "partition not exist")
305
if extentInfo, err = partition.ExtentStore().GetHasDeleteTinyRecords(); err != nil {
306
s.buildFailureResp(w, 500, err.Error())
310
s.buildSuccessResp(w, extentInfo)
313
func (s *DataNode) getNormalDeleted(w http.ResponseWriter, r *http.Request) {
317
extentInfo []storage.ExtentDeleted
319
if err = r.ParseForm(); err != nil {
320
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
323
if partitionID, err = strconv.ParseUint(r.FormValue("id"), 10, 64); err != nil {
324
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
327
partition := s.space.Partition(partitionID)
328
if partition == nil {
329
s.buildFailureResp(w, http.StatusNotFound, "partition not exist")
332
if extentInfo, err = partition.ExtentStore().GetHasDeleteExtent(); err != nil {
333
s.buildFailureResp(w, 500, err.Error())
337
s.buildSuccessResp(w, extentInfo)
340
func (s *DataNode) setQosEnable() func(http.ResponseWriter, *http.Request) {
341
return func(w http.ResponseWriter, r *http.Request) {
346
if err = r.ParseForm(); err != nil {
347
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
350
if enable, err = strconv.ParseBool(r.FormValue("enable")); err != nil {
351
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
354
s.diskQosEnable = enable
355
s.buildSuccessResp(w, "success")
359
func (s *DataNode) setDiskQos(w http.ResponseWriter, r *http.Request) {
360
if err := r.ParseForm(); err != nil {
361
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
364
parser := func(key string) (val int64, err error, has bool) {
365
valStr := r.FormValue(key)
370
val, err = strconv.ParseInt(valStr, 10, 64)
375
for key, pVal := range map[string]*int{
376
ConfigDiskReadIocc: &s.diskReadIocc,
377
ConfigDiskReadIops: &s.diskReadIops,
378
ConfigDiskReadFlow: &s.diskReadFlow,
379
ConfigDiskWriteIocc: &s.diskWriteIocc,
380
ConfigDiskWriteIops: &s.diskWriteIops,
381
ConfigDiskWriteFlow: &s.diskWriteFlow,
383
val, err, has := parser(key)
385
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
397
s.buildSuccessResp(w, "success")
400
func (s *DataNode) getDiskQos(w http.ResponseWriter, r *http.Request) {
401
disks := make([]interface{}, 0)
402
for _, diskItem := range s.space.GetDisks() {
404
Path string `json:"path"`
405
Read LimiterStatus `json:"read"`
406
Write LimiterStatus `json:"write"`
409
Read: diskItem.limitRead.Status(),
410
Write: diskItem.limitWrite.Status(),
412
disks = append(disks, disk)
414
diskStatus := &struct {
415
Disks []interface{} `json:"disks"`
416
Zone string `json:"zone"`
421
s.buildSuccessResp(w, diskStatus)
424
func (s *DataNode) getSmuxPoolStat() func(http.ResponseWriter, *http.Request) {
425
return func(w http.ResponseWriter, r *http.Request) {
426
if !s.enableSmuxConnPool {
427
s.buildFailureResp(w, 500, "smux pool not supported")
430
if s.smuxConnPool == nil {
431
s.buildFailureResp(w, 500, "smux pool now is nil")
434
stat := s.smuxConnPool.GetStat()
435
s.buildSuccessResp(w, stat)
439
func (s *DataNode) setMetricsDegrade(w http.ResponseWriter, r *http.Request) {
440
if err := r.ParseForm(); err != nil {
441
w.Write([]byte(err.Error()))
445
if level := r.FormValue("level"); level != "" {
446
val, err := strconv.Atoi(level)
448
w.Write([]byte("Set metrics degrade level failed\n"))
450
atomic.StoreInt64(&s.metricsDegrade, int64(val))
451
w.Write([]byte(fmt.Sprintf("Set metrics degrade level to %v successfully\n", val)))
456
func (s *DataNode) getMetricsDegrade(w http.ResponseWriter, r *http.Request) {
457
w.Write([]byte(fmt.Sprintf("%v\n", atomic.LoadInt64(&s.metricsDegrade))))
460
func (s *DataNode) genClusterVersionFile(w http.ResponseWriter, r *http.Request) {
461
paths := make([]string, 0)
462
s.space.RangePartitions(func(partition *DataPartition) bool {
463
paths = append(paths, partition.disk.Path)
466
paths = append(paths, s.raftDir)
468
for _, p := range paths {
469
if _, err := os.Stat(path.Join(p, config.ClusterVersionFile)); err == nil || os.IsExist(err) {
470
s.buildFailureResp(w, http.StatusCreated, "cluster version file already exists in "+p)
474
for _, p := range paths {
475
if err := config.CheckOrStoreClusterUuid(p, s.clusterUuid, true); err != nil {
476
s.buildFailureResp(w, http.StatusInternalServerError, "Failed to create cluster version file in "+p)
480
s.buildSuccessResp(w, "Generate cluster version file success")
483
func (s *DataNode) buildSuccessResp(w http.ResponseWriter, data interface{}) {
484
s.buildJSONResp(w, http.StatusOK, data, "")
487
func (s *DataNode) buildFailureResp(w http.ResponseWriter, code int, msg string) {
488
s.buildJSONResp(w, code, nil, msg)
491
// Create response for the API request.
492
func (s *DataNode) buildJSONResp(w http.ResponseWriter, code int, data interface{}, msg string) {
498
w.Header().Set("Content-Type", "application/json")
499
body := proto.HTTPReply{Code: int32(code), Msg: msg, Data: data}
500
if jsonBody, err = json.Marshal(body); err != nil {
506
func (s *DataNode) setDiskBadAPI(w http.ResponseWriter, r *http.Request) {
508
paramDiskPath = "diskPath"
516
if err = r.ParseForm(); err != nil {
517
err = fmt.Errorf("parse form fail: %v", err)
518
log.LogErrorf("[setDiskBadAPI] %v", err.Error())
519
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
523
if diskPath = r.FormValue(paramDiskPath); diskPath == "" {
524
err = fmt.Errorf("param(%v) is empty", paramDiskPath)
525
log.LogErrorf("[setDiskBadAPI] %v", err.Error())
526
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
530
if disk, err = s.space.GetDisk(diskPath); err != nil {
531
err = fmt.Errorf("not exit such dissk, path: %v", diskPath)
532
log.LogErrorf("[setDiskBadAPI] %v", err.Error())
533
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
537
if disk.Status == proto.Unavailable {
538
msg := fmt.Sprintf("disk(%v) status was already unavailable, nothing to do", disk.Path)
539
log.LogInfof("[setDiskBadAPI] %v", msg)
540
s.buildSuccessResp(w, msg)
544
log.LogWarnf("[setDiskBadAPI] set bad disk, path: %v", disk.Path)
547
s.buildSuccessResp(w, "OK")