1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
24
"github.com/cubefs/cubefs/proto"
25
"github.com/cubefs/cubefs/raftstore"
26
"github.com/cubefs/cubefs/util/atomicutil"
27
"github.com/cubefs/cubefs/util/loadutil"
28
"github.com/cubefs/cubefs/util/log"
29
"github.com/shirou/gopsutil/disk"
32
// SpaceManager manages the disk space.
33
type SpaceManager struct {
35
disks map[string]*Disk
36
partitions map[uint64]*DataPartition
37
raftStore raftstore.RaftStore
39
diskMutex sync.RWMutex
40
partitionMutex sync.RWMutex
43
selectedIndex int // TODO what is selected index
46
createPartitionMutex sync.RWMutex
47
diskUtils map[string]*atomicutil.Float64
48
samplerDone chan struct{}
51
const diskSampleDuration = 1 * time.Second
53
// NewSpaceManager creates a new space manager.
54
func NewSpaceManager(dataNode *DataNode) *SpaceManager {
55
space := &SpaceManager{}
56
space.disks = make(map[string]*Disk)
57
space.diskList = make([]string, 0)
58
space.partitions = make(map[uint64]*DataPartition)
59
space.stats = NewStats(dataNode.zoneName)
60
space.stopC = make(chan bool)
61
space.dataNode = dataNode
62
space.diskUtils = make(map[string]*atomicutil.Float64)
63
go space.statUpdateScheduler()
68
func (manager *SpaceManager) Stop() {
74
close(manager.samplerDone)
75
// Parallel stop data partitions.
76
const maxParallelism = 128
77
parallelism := int(math.Min(float64(maxParallelism), float64(len(manager.partitions))))
78
wg := sync.WaitGroup{}
79
partitionC := make(chan *DataPartition, parallelism)
83
for _, partition := range manager.partitions {
87
go func(c chan<- *DataPartition) {
89
for _, partition := range manager.partitions {
95
for i := 0; i < parallelism; i++ {
97
go func(c <-chan *DataPartition) {
99
var partition *DataPartition
101
if partition = <-c; partition == nil {
111
func (manager *SpaceManager) GetAllDiskPartitions() []*disk.PartitionStat {
112
manager.diskMutex.RLock()
113
defer manager.diskMutex.RUnlock()
114
partitions := make([]*disk.PartitionStat, 0, len(manager.disks))
115
for _, disk := range manager.disks {
116
partition := disk.GetDiskPartition()
117
if partition != nil {
118
partitions = append(partitions, partition)
124
func (manager *SpaceManager) FillIoUtils(samples map[string]loadutil.DiskIoSample) {
125
manager.diskMutex.RLock()
126
defer manager.diskMutex.RUnlock()
127
for _, sample := range samples {
128
util := manager.diskUtils[sample.GetPartition().Device]
130
util.Store(sample.GetIoUtilPercent())
135
func (manager *SpaceManager) StartDiskSample() {
136
manager.samplerDone = make(chan struct{})
140
case <-manager.samplerDone:
143
partitions := manager.GetAllDiskPartitions()
144
samples, err := loadutil.GetDisksIoSample(partitions, diskSampleDuration)
146
log.LogErrorf("failed to sample disk %v\n", err.Error())
149
manager.FillIoUtils(samples)
155
func (manager *SpaceManager) GetDiskUtils() map[string]float64 {
156
utils := make(map[string]float64)
157
manager.diskMutex.RLock()
158
defer manager.diskMutex.RUnlock()
159
for device, used := range manager.diskUtils {
160
utils[device] = used.Load()
165
func (manager *SpaceManager) SetNodeID(nodeID uint64) {
166
manager.nodeID = nodeID
169
func (manager *SpaceManager) GetNodeID() (nodeID uint64) {
170
return manager.nodeID
173
func (manager *SpaceManager) SetClusterID(clusterID string) {
174
manager.clusterID = clusterID
177
func (manager *SpaceManager) GetClusterID() (clusterID string) {
178
return manager.clusterID
181
func (manager *SpaceManager) SetRaftStore(raftStore raftstore.RaftStore) {
182
manager.raftStore = raftStore
185
func (manager *SpaceManager) GetRaftStore() (raftStore raftstore.RaftStore) {
186
return manager.raftStore
189
func (manager *SpaceManager) RangePartitions(f func(partition *DataPartition) bool) {
193
manager.partitionMutex.RLock()
194
partitions := make([]*DataPartition, 0)
195
for _, dp := range manager.partitions {
196
partitions = append(partitions, dp)
198
manager.partitionMutex.RUnlock()
200
for _, partition := range partitions {
207
func (manager *SpaceManager) GetDisks() (disks []*Disk) {
208
manager.diskMutex.RLock()
209
defer manager.diskMutex.RUnlock()
210
disks = make([]*Disk, 0)
211
for _, disk := range manager.disks {
212
disks = append(disks, disk)
217
func (manager *SpaceManager) Stats() *Stats {
221
func (manager *SpaceManager) LoadDisk(path string, reservedSpace, diskRdonlySpace uint64, maxErrCnt int) (err error) {
224
visitor PartitionVisitor
227
if diskRdonlySpace < reservedSpace {
228
diskRdonlySpace = reservedSpace
231
log.LogDebugf("action[LoadDisk] load disk from path(%v).", path)
232
visitor = func(dp *DataPartition) {
233
manager.partitionMutex.Lock()
234
defer manager.partitionMutex.Unlock()
235
if _, has := manager.partitions[dp.partitionID]; !has {
236
manager.partitions[dp.partitionID] = dp
237
log.LogDebugf("action[LoadDisk] put partition(%v) to manager manager.", dp.partitionID)
241
if _, err = manager.GetDisk(path); err != nil {
242
disk, err = NewDisk(path, reservedSpace, diskRdonlySpace, maxErrCnt, manager)
244
log.LogErrorf("NewDisk fail err:[%v]", err)
247
err = disk.RestorePartition(visitor)
249
log.LogErrorf("RestorePartition fail err:[%v]", err)
252
manager.putDisk(disk)
254
go disk.doBackendTask()
259
func (manager *SpaceManager) GetDisk(path string) (d *Disk, err error) {
260
manager.diskMutex.RLock()
261
defer manager.diskMutex.RUnlock()
262
disk, has := manager.disks[path]
263
if has && disk != nil {
267
err = fmt.Errorf("disk(%v) not exsit", path)
271
func (manager *SpaceManager) putDisk(d *Disk) {
272
manager.diskMutex.Lock()
273
manager.disks[d.Path] = d
274
manager.diskList = append(manager.diskList, d.Path)
275
if d.GetDiskPartition() != nil {
276
manager.diskUtils[d.GetDiskPartition().Device] = &atomicutil.Float64{}
277
manager.diskUtils[d.GetDiskPartition().Device].Store(0)
279
manager.diskMutex.Unlock()
282
func (manager *SpaceManager) updateMetrics() {
283
manager.diskMutex.RLock()
285
total, used, available uint64
286
totalPartitionSize, remainingCapacityToCreatePartition uint64
287
maxCapacityToCreatePartition, partitionCnt uint64
289
maxCapacityToCreatePartition = 0
290
for _, d := range manager.disks {
291
if d.Status == proto.Unavailable {
292
log.LogInfof("disk is broken, not stat disk useage, diskpath %s", d.Path)
298
available += d.Available
299
totalPartitionSize += d.Allocated
300
remainingCapacityToCreatePartition += d.Unallocated
301
partitionCnt += uint64(d.PartitionCount())
302
if maxCapacityToCreatePartition < d.Unallocated {
303
maxCapacityToCreatePartition = d.Unallocated
306
manager.diskMutex.RUnlock()
307
log.LogDebugf("action[updateMetrics] total(%v) used(%v) available(%v) totalPartitionSize(%v) remainingCapacityToCreatePartition(%v) "+
308
"partitionCnt(%v) maxCapacityToCreatePartition(%v) ", total, used, available, totalPartitionSize, remainingCapacityToCreatePartition, partitionCnt, maxCapacityToCreatePartition)
309
manager.stats.updateMetrics(total, used, available, totalPartitionSize,
310
remainingCapacityToCreatePartition, maxCapacityToCreatePartition, partitionCnt)
313
func (manager *SpaceManager) minPartitionCnt(decommissionedDisks []string) (d *Disk) {
314
manager.diskMutex.Lock()
315
defer manager.diskMutex.Unlock()
320
decommissionedDiskMap := make(map[string]struct{})
321
for _, disk := range decommissionedDisks {
322
decommissionedDiskMap[disk] = struct{}{}
324
minWeight = math.MaxFloat64
325
for _, disk := range manager.disks {
326
if _, ok := decommissionedDiskMap[disk.Path]; ok {
327
log.LogInfof("action[minPartitionCnt] exclude decommissioned disk[%v]", disk.Path)
330
if disk.Status != proto.ReadWrite {
333
diskWeight := disk.getSelectWeight()
334
if diskWeight < minWeight {
335
minWeight = diskWeight
339
if minWeightDisk == nil {
342
if minWeightDisk.Status != proto.ReadWrite {
349
func (manager *SpaceManager) statUpdateScheduler() {
351
ticker := time.NewTicker(10 * time.Second)
355
manager.updateMetrics()
356
case <-manager.stopC:
364
func (manager *SpaceManager) Partition(partitionID uint64) (dp *DataPartition) {
365
manager.partitionMutex.RLock()
366
defer manager.partitionMutex.RUnlock()
367
dp = manager.partitions[partitionID]
371
func (manager *SpaceManager) AttachPartition(dp *DataPartition) {
372
manager.partitionMutex.Lock()
373
defer manager.partitionMutex.Unlock()
374
manager.partitions[dp.partitionID] = dp
377
// DetachDataPartition removes a data partition from the partition map.
378
func (manager *SpaceManager) DetachDataPartition(partitionID uint64) {
379
manager.partitionMutex.Lock()
380
defer manager.partitionMutex.Unlock()
381
delete(manager.partitions, partitionID)
384
func (manager *SpaceManager) CreatePartition(request *proto.CreateDataPartitionRequest) (dp *DataPartition, err error) {
385
manager.partitionMutex.Lock()
386
defer manager.partitionMutex.Unlock()
387
dpCfg := &dataPartitionCfg{
388
PartitionID: request.PartitionId,
389
VolName: request.VolumeId,
390
Peers: request.Members,
391
Hosts: request.Hosts,
392
RaftStore: manager.raftStore,
393
NodeID: manager.nodeID,
394
ClusterID: manager.clusterID,
395
PartitionSize: request.PartitionSize,
396
PartitionType: int(request.PartitionTyp),
397
ReplicaNum: request.ReplicaNum,
398
VerSeq: request.VerSeq,
399
CreateType: request.CreateType,
402
log.LogInfof("action[CreatePartition] dp %v dpCfg.Peers %v request.Members %v",
403
dpCfg.PartitionID, dpCfg.Peers, request.Members)
404
dp = manager.partitions[dpCfg.PartitionID]
406
if err = dp.IsEquareCreateDataPartitionRequst(request); err != nil {
411
disk := manager.minPartitionCnt(request.DecommissionedDisks)
413
return nil, ErrNoSpaceToCreatePartition
415
if dp, err = CreateDataPartition(dpCfg, disk, request); err != nil {
418
manager.partitions[dp.partitionID] = dp
422
// DeletePartition deletes a partition based on the partition id.
423
func (manager *SpaceManager) DeletePartition(dpID uint64) {
424
manager.partitionMutex.Lock()
426
dp := manager.partitions[dpID]
428
manager.partitionMutex.Unlock()
432
delete(manager.partitions, dpID)
433
manager.partitionMutex.Unlock()
435
dp.Disk().DetachDataPartition(dp)
436
os.RemoveAll(dp.Path())
439
func (s *DataNode) buildHeartBeatResponse(response *proto.DataNodeHeartbeatResponse) {
440
response.Status = proto.TaskSucceeds
441
stat := s.space.Stats()
443
response.Used = stat.Used
444
response.Total = stat.Total
445
response.Available = stat.Available
446
response.CreatedPartitionCnt = uint32(stat.CreatedPartitionCnt)
447
response.TotalPartitionSize = stat.TotalPartitionSize
448
response.MaxCapacity = stat.MaxCapacityToCreatePartition
449
response.RemainingCapacity = stat.RemainingCapacityToCreatePartition
450
response.BadDisks = make([]string, 0)
451
response.BadDiskStats = make([]proto.BadDiskStat, 0)
452
response.StartTime = s.startTime
455
response.ZoneName = s.zoneName
456
response.PartitionReports = make([]*proto.DataPartitionReport, 0)
458
space.RangePartitions(func(partition *DataPartition) bool {
459
leaderAddr, isLeader := partition.IsRaftLeader()
460
vr := &proto.DataPartitionReport{
461
VolName: partition.volumeID,
462
PartitionID: uint64(partition.partitionID),
463
PartitionStatus: partition.Status(),
464
Total: uint64(partition.Size()),
465
Used: uint64(partition.Used()),
466
DiskPath: partition.Disk().Path,
468
ExtentCount: partition.GetExtentCount(),
470
DecommissionRepairProgress: partition.decommissionRepairProgress,
472
log.LogDebugf("action[Heartbeats] dpid(%v), status(%v) total(%v) used(%v) leader(%v) isLeader(%v).", vr.PartitionID, vr.PartitionStatus, vr.Total, vr.Used, leaderAddr, vr.IsLeader)
473
response.PartitionReports = append(response.PartitionReports, vr)
477
disks := space.GetDisks()
478
for _, d := range disks {
479
if d.Status == proto.Unavailable {
480
response.BadDisks = append(response.BadDisks, d.Path)
482
bds := proto.BadDiskStat{
484
TotalPartitionCnt: d.PartitionCount(),
485
DiskErrPartitionList: d.GetDiskErrPartitionList(),
487
response.BadDiskStats = append(response.BadDiskStats, bds)
492
func (manager *SpaceManager) getPartitionIds() []uint64 {
493
res := make([]uint64, 0)
494
for id := range manager.partitions {
495
res = append(res, id)