cubefs

Форк
0
/
server_handler.go 
548 строк · 15.7 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package datanode
16

17
import (
18
	"encoding/json"
19
	"fmt"
20
	"net/http"
21
	"os"
22
	"path"
23
	"strconv"
24
	"sync/atomic"
25

26
	"github.com/cubefs/cubefs/depends/tiglabs/raft"
27
	"github.com/cubefs/cubefs/proto"
28
	"github.com/cubefs/cubefs/storage"
29
	"github.com/cubefs/cubefs/util/config"
30
	"github.com/cubefs/cubefs/util/log"
31
)
32

33
var AutoRepairStatus = true
34

35
func (s *DataNode) getDiskAPI(w http.ResponseWriter, r *http.Request) {
36
	disks := make([]interface{}, 0)
37
	for _, diskItem := range s.space.GetDisks() {
38
		disk := &struct {
39
			Path         string `json:"path"`
40
			Total        uint64 `json:"total"`
41
			Used         uint64 `json:"used"`
42
			Available    uint64 `json:"available"`
43
			Unallocated  uint64 `json:"unallocated"`
44
			Allocated    uint64 `json:"allocated"`
45
			Status       int    `json:"status"`
46
			RestSize     uint64 `json:"restSize"`
47
			DiskRdoSize  uint64 `json:"diskRdoSize"`
48
			Partitions   int    `json:"partitions"`
49
			Decommission bool   `json:"decommission"`
50
		}{
51
			Path:         diskItem.Path,
52
			Total:        diskItem.Total,
53
			Used:         diskItem.Used,
54
			Available:    diskItem.Available,
55
			Unallocated:  diskItem.Unallocated,
56
			Allocated:    diskItem.Allocated,
57
			Status:       diskItem.Status,
58
			RestSize:     diskItem.ReservedSpace,
59
			DiskRdoSize:  diskItem.DiskRdonlySpace,
60
			Partitions:   diskItem.PartitionCount(),
61
			Decommission: diskItem.GetDecommissionStatus(),
62
		}
63
		disks = append(disks, disk)
64
	}
65
	diskReport := &struct {
66
		Disks []interface{} `json:"disks"`
67
		Zone  string        `json:"zone"`
68
	}{
69
		Disks: disks,
70
		Zone:  s.zoneName,
71
	}
72
	s.buildSuccessResp(w, diskReport)
73
}
74

75
func (s *DataNode) getStatAPI(w http.ResponseWriter, r *http.Request) {
76
	response := &proto.DataNodeHeartbeatResponse{}
77
	s.buildHeartBeatResponse(response)
78

79
	s.buildSuccessResp(w, response)
80
}
81

82
func (s *DataNode) setAutoRepairStatus(w http.ResponseWriter, r *http.Request) {
83
	const (
84
		paramAutoRepair = "autoRepair"
85
	)
86
	if err := r.ParseForm(); err != nil {
87
		err = fmt.Errorf("parse form fail: %v", err)
88
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
89
		return
90
	}
91
	autoRepair, err := strconv.ParseBool(r.FormValue(paramAutoRepair))
92
	if err != nil {
93
		err = fmt.Errorf("parse param %v fail: %v", paramAutoRepair, err)
94
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
95
		return
96
	}
97
	AutoRepairStatus = autoRepair
98
	s.buildSuccessResp(w, autoRepair)
99
}
100

101
func (s *DataNode) getRaftStatus(w http.ResponseWriter, r *http.Request) {
102
	const (
103
		paramRaftID = "raftID"
104
	)
105
	if err := r.ParseForm(); err != nil {
106
		err = fmt.Errorf("parse form fail: %v", err)
107
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
108
		return
109
	}
110
	raftID, err := strconv.ParseUint(r.FormValue(paramRaftID), 10, 64)
111
	if err != nil {
112
		err = fmt.Errorf("parse param %v fail: %v", paramRaftID, err)
113
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
114
		return
115
	}
116
	raftStatus := s.raftStore.RaftStatus(raftID)
117
	s.buildSuccessResp(w, raftStatus)
118
}
119

120
func (s *DataNode) getPartitionsAPI(w http.ResponseWriter, r *http.Request) {
121
	partitions := make([]interface{}, 0)
122
	s.space.RangePartitions(func(dp *DataPartition) bool {
123
		partition := &struct {
124
			ID       uint64   `json:"id"`
125
			Size     int      `json:"size"`
126
			Used     int      `json:"used"`
127
			Status   int      `json:"status"`
128
			Path     string   `json:"path"`
129
			Replicas []string `json:"replicas"`
130
		}{
131
			ID:       dp.partitionID,
132
			Size:     dp.Size(),
133
			Used:     dp.Used(),
134
			Status:   dp.Status(),
135
			Path:     dp.Path(),
136
			Replicas: dp.Replicas(),
137
		}
138
		partitions = append(partitions, partition)
139
		return true
140
	})
141
	result := &struct {
142
		Partitions     []interface{} `json:"partitions"`
143
		PartitionCount int           `json:"partitionCount"`
144
	}{
145
		Partitions:     partitions,
146
		PartitionCount: len(partitions),
147
	}
148
	s.buildSuccessResp(w, result)
149
}
150

151
func (s *DataNode) getPartitionAPI(w http.ResponseWriter, r *http.Request) {
152
	const (
153
		paramPartitionID = "id"
154
	)
155
	var (
156
		partitionID          uint64
157
		files                []*storage.ExtentInfo
158
		err                  error
159
		tinyDeleteRecordSize int64
160
		raftSt               *raft.Status
161
	)
162
	if err = r.ParseForm(); err != nil {
163
		err = fmt.Errorf("parse form fail: %v", err)
164
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
165
		return
166
	}
167
	if partitionID, err = strconv.ParseUint(r.FormValue(paramPartitionID), 10, 64); err != nil {
168
		err = fmt.Errorf("parse param %v fail: %v", paramPartitionID, err)
169
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
170
		return
171
	}
172
	partition := s.space.Partition(partitionID)
173
	if partition == nil {
174
		s.buildFailureResp(w, http.StatusNotFound, "partition not exist")
175
		return
176
	}
177
	if files, tinyDeleteRecordSize, err = partition.ExtentStore().GetAllWatermarks(nil); err != nil {
178
		err = fmt.Errorf("get watermark fail: %v", err)
179
		s.buildFailureResp(w, http.StatusInternalServerError, err.Error())
180
		return
181
	}
182

183
	if partition.IsDataPartitionLoading() {
184
		raftSt = &raft.Status{Stopped: true}
185
	} else {
186
		raftSt = partition.raftPartition.Status()
187
	}
188

189
	result := &struct {
190
		VolName              string                `json:"volName"`
191
		ID                   uint64                `json:"id"`
192
		Size                 int                   `json:"size"`
193
		Used                 int                   `json:"used"`
194
		Status               int                   `json:"status"`
195
		Path                 string                `json:"path"`
196
		Files                []*storage.ExtentInfo `json:"extents"`
197
		FileCount            int                   `json:"fileCount"`
198
		Replicas             []string              `json:"replicas"`
199
		TinyDeleteRecordSize int64                 `json:"tinyDeleteRecordSize"`
200
		RaftStatus           *raft.Status          `json:"raftStatus"`
201
	}{
202
		VolName:              partition.volumeID,
203
		ID:                   partition.partitionID,
204
		Size:                 partition.Size(),
205
		Used:                 partition.Used(),
206
		Status:               partition.Status(),
207
		Path:                 partition.Path(),
208
		Files:                files,
209
		FileCount:            len(files),
210
		Replicas:             partition.Replicas(),
211
		TinyDeleteRecordSize: tinyDeleteRecordSize,
212
		RaftStatus:           raftSt,
213
	}
214

215
	if partition.isNormalType() {
216
		result.RaftStatus = partition.raftPartition.Status()
217
	}
218

219
	s.buildSuccessResp(w, result)
220
}
221

222
func (s *DataNode) getExtentAPI(w http.ResponseWriter, r *http.Request) {
223
	var (
224
		partitionID uint64
225
		extentID    int
226
		err         error
227
		extentInfo  *storage.ExtentInfo
228
	)
229
	if err = r.ParseForm(); err != nil {
230
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
231
		return
232
	}
233
	if partitionID, err = strconv.ParseUint(r.FormValue("partitionID"), 10, 64); err != nil {
234
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
235
		return
236
	}
237
	if extentID, err = strconv.Atoi(r.FormValue("extentID")); err != nil {
238
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
239
		return
240
	}
241
	partition := s.space.Partition(partitionID)
242
	if partition == nil {
243
		s.buildFailureResp(w, http.StatusNotFound, "partition not exist")
244
		return
245
	}
246
	if extentInfo, err = partition.ExtentStore().Watermark(uint64(extentID)); err != nil {
247
		s.buildFailureResp(w, 500, err.Error())
248
		return
249
	}
250

251
	s.buildSuccessResp(w, extentInfo)
252
}
253

254
func (s *DataNode) getBlockCrcAPI(w http.ResponseWriter, r *http.Request) {
255
	var (
256
		partitionID uint64
257
		extentID    int
258
		err         error
259
		blocks      []*storage.BlockCrc
260
	)
261
	if err = r.ParseForm(); err != nil {
262
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
263
		return
264
	}
265
	if partitionID, err = strconv.ParseUint(r.FormValue("partitionID"), 10, 64); err != nil {
266
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
267
		return
268
	}
269
	if extentID, err = strconv.Atoi(r.FormValue("extentID")); err != nil {
270
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
271
		return
272
	}
273
	partition := s.space.Partition(partitionID)
274
	if partition == nil {
275
		s.buildFailureResp(w, http.StatusNotFound, "partition not exist")
276
		return
277
	}
278
	if blocks, err = partition.ExtentStore().ScanBlocks(uint64(extentID)); err != nil {
279
		s.buildFailureResp(w, 500, err.Error())
280
		return
281
	}
282

283
	s.buildSuccessResp(w, blocks)
284
}
285

286
func (s *DataNode) getTinyDeleted(w http.ResponseWriter, r *http.Request) {
287
	var (
288
		partitionID uint64
289
		err         error
290
		extentInfo  []storage.ExtentDeleted
291
	)
292
	if err = r.ParseForm(); err != nil {
293
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
294
		return
295
	}
296
	if partitionID, err = strconv.ParseUint(r.FormValue("id"), 10, 64); err != nil {
297
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
298
		return
299
	}
300
	partition := s.space.Partition(partitionID)
301
	if partition == nil {
302
		s.buildFailureResp(w, http.StatusNotFound, "partition not exist")
303
		return
304
	}
305
	if extentInfo, err = partition.ExtentStore().GetHasDeleteTinyRecords(); err != nil {
306
		s.buildFailureResp(w, 500, err.Error())
307
		return
308
	}
309

310
	s.buildSuccessResp(w, extentInfo)
311
}
312

313
func (s *DataNode) getNormalDeleted(w http.ResponseWriter, r *http.Request) {
314
	var (
315
		partitionID uint64
316
		err         error
317
		extentInfo  []storage.ExtentDeleted
318
	)
319
	if err = r.ParseForm(); err != nil {
320
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
321
		return
322
	}
323
	if partitionID, err = strconv.ParseUint(r.FormValue("id"), 10, 64); err != nil {
324
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
325
		return
326
	}
327
	partition := s.space.Partition(partitionID)
328
	if partition == nil {
329
		s.buildFailureResp(w, http.StatusNotFound, "partition not exist")
330
		return
331
	}
332
	if extentInfo, err = partition.ExtentStore().GetHasDeleteExtent(); err != nil {
333
		s.buildFailureResp(w, 500, err.Error())
334
		return
335
	}
336

337
	s.buildSuccessResp(w, extentInfo)
338
}
339

340
func (s *DataNode) setQosEnable() func(http.ResponseWriter, *http.Request) {
341
	return func(w http.ResponseWriter, r *http.Request) {
342
		var (
343
			err    error
344
			enable bool
345
		)
346
		if err = r.ParseForm(); err != nil {
347
			s.buildFailureResp(w, http.StatusBadRequest, err.Error())
348
			return
349
		}
350
		if enable, err = strconv.ParseBool(r.FormValue("enable")); err != nil {
351
			s.buildFailureResp(w, http.StatusBadRequest, err.Error())
352
			return
353
		}
354
		s.diskQosEnable = enable
355
		s.buildSuccessResp(w, "success")
356
	}
357
}
358

359
func (s *DataNode) setDiskQos(w http.ResponseWriter, r *http.Request) {
360
	if err := r.ParseForm(); err != nil {
361
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
362
		return
363
	}
364
	parser := func(key string) (val int64, err error, has bool) {
365
		valStr := r.FormValue(key)
366
		if valStr == "" {
367
			return 0, nil, false
368
		}
369
		has = true
370
		val, err = strconv.ParseInt(valStr, 10, 64)
371
		return
372
	}
373

374
	updated := false
375
	for key, pVal := range map[string]*int{
376
		ConfigDiskReadIocc:  &s.diskReadIocc,
377
		ConfigDiskReadIops:  &s.diskReadIops,
378
		ConfigDiskReadFlow:  &s.diskReadFlow,
379
		ConfigDiskWriteIocc: &s.diskWriteIocc,
380
		ConfigDiskWriteIops: &s.diskWriteIops,
381
		ConfigDiskWriteFlow: &s.diskWriteFlow,
382
	} {
383
		val, err, has := parser(key)
384
		if err != nil {
385
			s.buildFailureResp(w, http.StatusBadRequest, err.Error())
386
			return
387
		}
388
		if has {
389
			updated = true
390
			*pVal = int(val)
391
		}
392
	}
393

394
	if updated {
395
		s.updateQosLimit()
396
	}
397
	s.buildSuccessResp(w, "success")
398
}
399

400
func (s *DataNode) getDiskQos(w http.ResponseWriter, r *http.Request) {
401
	disks := make([]interface{}, 0)
402
	for _, diskItem := range s.space.GetDisks() {
403
		disk := &struct {
404
			Path  string        `json:"path"`
405
			Read  LimiterStatus `json:"read"`
406
			Write LimiterStatus `json:"write"`
407
		}{
408
			Path:  diskItem.Path,
409
			Read:  diskItem.limitRead.Status(),
410
			Write: diskItem.limitWrite.Status(),
411
		}
412
		disks = append(disks, disk)
413
	}
414
	diskStatus := &struct {
415
		Disks []interface{} `json:"disks"`
416
		Zone  string        `json:"zone"`
417
	}{
418
		Disks: disks,
419
		Zone:  s.zoneName,
420
	}
421
	s.buildSuccessResp(w, diskStatus)
422
}
423

424
func (s *DataNode) getSmuxPoolStat() func(http.ResponseWriter, *http.Request) {
425
	return func(w http.ResponseWriter, r *http.Request) {
426
		if !s.enableSmuxConnPool {
427
			s.buildFailureResp(w, 500, "smux pool not supported")
428
			return
429
		}
430
		if s.smuxConnPool == nil {
431
			s.buildFailureResp(w, 500, "smux pool now is nil")
432
			return
433
		}
434
		stat := s.smuxConnPool.GetStat()
435
		s.buildSuccessResp(w, stat)
436
	}
437
}
438

439
func (s *DataNode) setMetricsDegrade(w http.ResponseWriter, r *http.Request) {
440
	if err := r.ParseForm(); err != nil {
441
		w.Write([]byte(err.Error()))
442
		return
443
	}
444

445
	if level := r.FormValue("level"); level != "" {
446
		val, err := strconv.Atoi(level)
447
		if err != nil {
448
			w.Write([]byte("Set metrics degrade level failed\n"))
449
		} else {
450
			atomic.StoreInt64(&s.metricsDegrade, int64(val))
451
			w.Write([]byte(fmt.Sprintf("Set metrics degrade level to %v successfully\n", val)))
452
		}
453
	}
454
}
455

456
func (s *DataNode) getMetricsDegrade(w http.ResponseWriter, r *http.Request) {
457
	w.Write([]byte(fmt.Sprintf("%v\n", atomic.LoadInt64(&s.metricsDegrade))))
458
}
459

460
func (s *DataNode) genClusterVersionFile(w http.ResponseWriter, r *http.Request) {
461
	paths := make([]string, 0)
462
	s.space.RangePartitions(func(partition *DataPartition) bool {
463
		paths = append(paths, partition.disk.Path)
464
		return true
465
	})
466
	paths = append(paths, s.raftDir)
467

468
	for _, p := range paths {
469
		if _, err := os.Stat(path.Join(p, config.ClusterVersionFile)); err == nil || os.IsExist(err) {
470
			s.buildFailureResp(w, http.StatusCreated, "cluster version file already exists in "+p)
471
			return
472
		}
473
	}
474
	for _, p := range paths {
475
		if err := config.CheckOrStoreClusterUuid(p, s.clusterUuid, true); err != nil {
476
			s.buildFailureResp(w, http.StatusInternalServerError, "Failed to create cluster version file in "+p)
477
			return
478
		}
479
	}
480
	s.buildSuccessResp(w, "Generate cluster version file success")
481
}
482

483
func (s *DataNode) buildSuccessResp(w http.ResponseWriter, data interface{}) {
484
	s.buildJSONResp(w, http.StatusOK, data, "")
485
}
486

487
func (s *DataNode) buildFailureResp(w http.ResponseWriter, code int, msg string) {
488
	s.buildJSONResp(w, code, nil, msg)
489
}
490

491
// Create response for the API request.
492
func (s *DataNode) buildJSONResp(w http.ResponseWriter, code int, data interface{}, msg string) {
493
	var (
494
		jsonBody []byte
495
		err      error
496
	)
497
	w.WriteHeader(code)
498
	w.Header().Set("Content-Type", "application/json")
499
	body := proto.HTTPReply{Code: int32(code), Msg: msg, Data: data}
500
	if jsonBody, err = json.Marshal(body); err != nil {
501
		return
502
	}
503
	w.Write(jsonBody)
504
}
505

506
func (s *DataNode) setDiskBadAPI(w http.ResponseWriter, r *http.Request) {
507
	const (
508
		paramDiskPath = "diskPath"
509
	)
510
	var (
511
		err      error
512
		diskPath string
513
		disk     *Disk
514
	)
515

516
	if err = r.ParseForm(); err != nil {
517
		err = fmt.Errorf("parse form fail: %v", err)
518
		log.LogErrorf("[setDiskBadAPI] %v", err.Error())
519
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
520
		return
521
	}
522

523
	if diskPath = r.FormValue(paramDiskPath); diskPath == "" {
524
		err = fmt.Errorf("param(%v) is empty", paramDiskPath)
525
		log.LogErrorf("[setDiskBadAPI] %v", err.Error())
526
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
527
		return
528
	}
529

530
	if disk, err = s.space.GetDisk(diskPath); err != nil {
531
		err = fmt.Errorf("not exit such dissk, path: %v", diskPath)
532
		log.LogErrorf("[setDiskBadAPI] %v", err.Error())
533
		s.buildFailureResp(w, http.StatusBadRequest, err.Error())
534
		return
535
	}
536

537
	if disk.Status == proto.Unavailable {
538
		msg := fmt.Sprintf("disk(%v) status was already unavailable, nothing to do", disk.Path)
539
		log.LogInfof("[setDiskBadAPI] %v", msg)
540
		s.buildSuccessResp(w, msg)
541
		return
542
	}
543

544
	log.LogWarnf("[setDiskBadAPI] set bad disk, path: %v", disk.Path)
545
	disk.doDiskError()
546

547
	s.buildSuccessResp(w, "OK")
548
}
549

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.