31
"golang.org/x/time/rate"
33
"github.com/cubefs/cubefs/proto"
34
"github.com/cubefs/cubefs/util/exporter"
35
"github.com/cubefs/cubefs/util/loadutil"
36
"github.com/cubefs/cubefs/util/log"
37
"github.com/shirou/gopsutil/disk"
42
RegexpDataPartitionDir, _ = regexp.Compile(`^datapartition_(\d)+_(\d)+$`)
43
RegexpCachePartitionDir, _ = regexp.Compile(`^cachepartition_(\d)+_(\d)+$`)
44
RegexpPreLoadPartitionDir, _ = regexp.Compile(`^preloadpartition_(\d)+_(\d)+$`)
45
RegexpExpiredDataPartitionDir, _ = regexp.Compile(`^expired_datapartition_(\d)+_(\d)+$`)
49
ExpiredPartitionPrefix = "expired_"
50
ExpiredPartitionExistTime = time.Hour * time.Duration(24*7)
54
DecommissionDiskMark = "decommissionDiskMark"
73
DiskRdonlySpace uint64
76
partitionMap map[uint64]*DataPartition
77
syncTinyDeleteRecordFromLeaderOnEveryDisk chan bool
81
limitFactor map[uint32]*rate.Limiter
86
diskPartition *disk.PartitionStat
87
DiskErrPartitionSet map[uint64]struct{}
92
SyncTinyDeleteRecordFromLeaderOnEveryDisk = 5
95
type PartitionVisitor func(dp *DataPartition)
97
func NewDisk(path string, reservedSpace, diskRdonlySpace uint64, maxErrCnt int, space *SpaceManager) (d *Disk, err error) {
100
d.ReservedSpace = reservedSpace
101
d.DiskRdonlySpace = diskRdonlySpace
102
d.MaxErrCnt = maxErrCnt
103
d.RejectWrite = false
105
d.dataNode = space.dataNode
106
d.partitionMap = make(map[uint64]*DataPartition)
107
d.syncTinyDeleteRecordFromLeaderOnEveryDisk = make(chan bool, SyncTinyDeleteRecordFromLeaderOnEveryDisk)
108
err = d.computeUsage()
112
err = d.updateSpaceInfo()
117
d.diskPartition, err = loadutil.GetMatchParation(d.Path)
120
log.LogErrorf("get partition info error, path is %v error message %v", d.Path, err.Error())
123
d.startScheduleToUpdateSpaceInfo()
125
d.limitFactor = make(map[uint32]*rate.Limiter, 0)
126
d.limitFactor[proto.FlowReadType] = rate.NewLimiter(rate.Limit(proto.QosDefaultDiskMaxFLowLimit), proto.QosDefaultBurst)
127
d.limitFactor[proto.FlowWriteType] = rate.NewLimiter(rate.Limit(proto.QosDefaultDiskMaxFLowLimit), proto.QosDefaultBurst)
128
d.limitFactor[proto.IopsReadType] = rate.NewLimiter(rate.Limit(proto.QosDefaultDiskMaxIoLimit), defaultIOLimitBurst)
129
d.limitFactor[proto.IopsWriteType] = rate.NewLimiter(rate.Limit(proto.QosDefaultDiskMaxIoLimit), defaultIOLimitBurst)
130
d.limitRead = newIOLimiter(space.dataNode.diskReadFlow, space.dataNode.diskReadIocc)
131
d.limitWrite = newIOLimiter(space.dataNode.diskWriteFlow, space.dataNode.diskWriteIocc)
133
d.DiskErrPartitionSet = make(map[uint64]struct{}, 0)
135
err = d.initDecommissionStatus()
137
log.LogErrorf("action[NewDisk]: failed to load disk decommission status")
144
func (d *Disk) MarkDecommissionStatus(decommission bool) {
145
probePath := path.Join(d.Path, DecommissionDiskMark)
149
log.LogErrorf("action[MarkDecommissionStatus]: %v", err)
154
file, err := os.Create(probePath)
159
err = os.Remove(probePath)
160
if os.IsNotExist(err) {
164
d.decommission = decommission
167
func (d *Disk) GetDecommissionStatus() bool {
168
return d.decommission
171
func (d *Disk) initDecommissionStatus() error {
172
probePath := path.Join(d.Path, DecommissionDiskMark)
173
_, err := os.Stat(probePath)
175
d.decommission = true
178
if os.IsNotExist(err) {
184
func (d *Disk) GetDiskPartition() *disk.PartitionStat {
185
return d.diskPartition
188
func (d *Disk) updateQosLimiter() {
189
if d.dataNode.diskReadFlow > 0 {
190
d.limitFactor[proto.FlowReadType].SetLimit(rate.Limit(d.dataNode.diskReadFlow))
192
if d.dataNode.diskWriteFlow > 0 {
193
d.limitFactor[proto.FlowWriteType].SetLimit(rate.Limit(d.dataNode.diskWriteFlow))
195
if d.dataNode.diskReadIops > 0 {
196
d.limitFactor[proto.IopsReadType].SetLimit(rate.Limit(d.dataNode.diskReadIops))
198
if d.dataNode.diskWriteIops > 0 {
199
d.limitFactor[proto.IopsWriteType].SetLimit(rate.Limit(d.dataNode.diskWriteIops))
201
for i := proto.IopsReadType; i < proto.FlowWriteType; i++ {
202
log.LogInfof("action[updateQosLimiter] type %v limit %v", proto.QosTypeString(i), d.limitFactor[i].Limit())
204
log.LogInfof("action[updateQosLimiter] read(iocc:%d iops:%d flow:%d) write(iocc:%d iops:%d flow:%d)",
205
d.dataNode.diskReadIocc, d.dataNode.diskReadIops, d.dataNode.diskReadFlow,
206
d.dataNode.diskWriteIocc, d.dataNode.diskWriteIops, d.dataNode.diskWriteFlow)
207
d.limitRead.ResetIO(d.dataNode.diskReadIocc)
208
d.limitRead.ResetFlow(d.dataNode.diskReadFlow)
209
d.limitWrite.ResetIO(d.dataNode.diskWriteIocc)
210
d.limitWrite.ResetFlow(d.dataNode.diskWriteFlow)
213
func (d *Disk) allocCheckLimit(factorType uint32, used uint32) error {
214
if !(d.dataNode.diskQosEnableFromMaster && d.dataNode.diskQosEnable) {
218
ctx := context.Background()
219
d.limitFactor[factorType].WaitN(ctx, int(used))
224
func (d *Disk) PartitionCount() int {
227
return len(d.partitionMap)
230
func (d *Disk) CanWrite() bool {
231
if d.Status == proto.ReadWrite || !d.RejectWrite {
237
if d.Total+d.DiskRdonlySpace > d.Used+d.ReservedSpace {
245
func (d *Disk) computeUsage() (err error) {
248
fs := syscall.Statfs_t{}
249
err = syscall.Statfs(d.Path, &fs)
251
log.LogErrorf("computeUsage. err %v", err)
255
repairSize := uint64(d.repairAllocSize())
258
total := int64(fs.Blocks*uint64(fs.Bsize) - d.DiskRdonlySpace)
262
d.Total = uint64(total)
265
available := int64(fs.Bavail*uint64(fs.Bsize) - d.DiskRdonlySpace - repairSize)
269
d.Available = uint64(available)
272
free := int64(fs.Bfree*uint64(fs.Bsize) - d.DiskRdonlySpace - repairSize)
274
used := int64(total - free)
278
d.Used = uint64(used)
280
allocatedSize := int64(0)
281
for _, dp := range d.partitionMap {
282
allocatedSize += int64(dp.Size())
285
log.LogDebugf("computeUsage. fs info [%v,%v,%v,%v] total %v available %v DiskRdonlySpace %v ReservedSpace %v allocatedSize %v",
286
fs.Blocks, fs.Bsize, fs.Bavail, fs.Bfree, d.Total, d.Available, d.DiskRdonlySpace, d.ReservedSpace, allocatedSize)
288
atomic.StoreUint64(&d.Allocated, uint64(allocatedSize))
290
unallocated := total - allocatedSize
294
if d.Available <= 0 {
297
d.RejectWrite = false
299
d.Unallocated = uint64(unallocated)
301
log.LogDebugf("action[computeUsage] disk(%v) all(%v) available(%v) used(%v)", d.Path, d.Total, d.Available, d.Used)
306
func (d *Disk) repairAllocSize() int {
308
for _, dp := range d.partitionMap {
309
if dp.DataPartitionCreateType == proto.NormalCreateDataPartition || dp.leaderSize <= dp.used {
313
allocSize += dp.leaderSize - dp.used
319
func (d *Disk) incReadErrCnt() {
320
atomic.AddUint64(&d.ReadErrCnt, 1)
323
func (d *Disk) getReadErrCnt() uint64 {
324
return atomic.LoadUint64(&d.ReadErrCnt)
327
func (d *Disk) incWriteErrCnt() {
328
atomic.AddUint64(&d.WriteErrCnt, 1)
331
func (d *Disk) getWriteErrCnt() uint64 {
332
return atomic.LoadUint64(&d.WriteErrCnt)
335
func (d *Disk) getTotalErrCnt() uint64 {
336
return d.getReadErrCnt() + d.getWriteErrCnt()
339
func (d *Disk) startScheduleToUpdateSpaceInfo() {
341
updateSpaceInfoTicker := time.NewTicker(5 * time.Second)
342
checkStatusTicker := time.NewTicker(time.Minute * 2)
344
updateSpaceInfoTicker.Stop()
345
checkStatusTicker.Stop()
349
case <-updateSpaceInfoTicker.C:
352
case <-checkStatusTicker.C:
359
func (d *Disk) doBackendTask() {
361
partitions := make([]*DataPartition, 0)
363
for _, dp := range d.partitionMap {
364
partitions = append(partitions, dp)
367
for _, dp := range partitions {
368
dp.extentStore.BackendTask()
370
time.Sleep(time.Minute)
375
DiskStatusFile = ".diskStatus"
378
func (d *Disk) checkDiskStatus() {
379
if d.Status == proto.Unavailable {
380
log.LogInfof("[checkDiskStatus] disk status is unavailable, no need to check, disk path(%v)", d.Path)
384
path := path.Join(d.Path, DiskStatusFile)
385
fp, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0o755)
387
d.CheckDiskError(err, ReadFlag)
391
data := []byte(DiskStatusFile)
392
_, err = fp.WriteAt(data, 0)
394
d.CheckDiskError(err, WriteFlag)
397
if err = fp.Sync(); err != nil {
398
d.CheckDiskError(err, WriteFlag)
401
if _, err = fp.ReadAt(data, 0); err != nil {
402
d.CheckDiskError(err, ReadFlag)
407
const DiskErrNotAssociatedWithPartition uint64 = 0
409
func (d *Disk) CheckDiskError(err error, rwFlag uint8) {
413
log.LogWarnf("CheckDiskError disk err: %v, disk:%v", err.Error(), d.Path)
415
if !IsDiskErr(err.Error()) {
419
d.triggerDiskError(rwFlag, DiskErrNotAssociatedWithPartition)
422
func (d *Disk) doDiskError() {
423
d.Status = proto.Unavailable
427
func (d *Disk) triggerDiskError(rwFlag uint8, dpId uint64) {
428
mesg := fmt.Sprintf("disk path %v error on %v, dpId %v", d.Path, LocalIP, dpId)
429
exporter.Warning(mesg)
432
if rwFlag == WriteFlag {
434
} else if rwFlag == ReadFlag {
441
d.AddDiskErrPartition(dpId)
443
diskErrCnt := d.getTotalErrCnt()
444
diskErrPartitionCnt := d.GetDiskErrPartitionCount()
445
if diskErrPartitionCnt >= d.dataNode.diskUnavailablePartitionErrorCount {
446
msg := fmt.Sprintf("set disk unavailable for too many disk error, "+
447
"disk path(%v), ip(%v), diskErrCnt(%v), diskErrPartitionCnt(%v) threshold(%v)",
448
d.Path, LocalIP, diskErrCnt, diskErrPartitionCnt, d.dataNode.diskUnavailablePartitionErrorCount)
449
exporter.Warning(msg)
455
func (d *Disk) updateSpaceInfo() (err error) {
456
var statsInfo syscall.Statfs_t
457
if err = syscall.Statfs(d.Path, &statsInfo); err != nil {
461
if d.Status == proto.Unavailable {
462
mesg := fmt.Sprintf("disk path %v error on %v", d.Path, LocalIP)
464
exporter.Warning(mesg)
466
} else if d.Available <= 0 {
467
d.Status = proto.ReadOnly
469
d.Status = proto.ReadWrite
472
log.LogDebugf("action[updateSpaceInfo] disk(%v) total(%v) available(%v) remain(%v) "+
473
"restSize(%v) preRestSize (%v) maxErrs(%v) readErrs(%v) writeErrs(%v) status(%v)", d.Path,
474
d.Total, d.Available, d.Unallocated, d.ReservedSpace, d.DiskRdonlySpace, d.MaxErrCnt, d.ReadErrCnt, d.WriteErrCnt, d.Status)
479
func (d *Disk) AttachDataPartition(dp *DataPartition) {
481
d.partitionMap[dp.partitionID] = dp
488
func (d *Disk) DetachDataPartition(dp *DataPartition) {
490
delete(d.partitionMap, dp.partitionID)
491
delete(d.DiskErrPartitionSet, dp.partitionID)
498
func (d *Disk) GetDataPartition(partitionID uint64) (partition *DataPartition) {
501
return d.partitionMap[partitionID]
504
func (d *Disk) GetDataPartitionCount() int {
507
return len(d.partitionMap)
510
func (d *Disk) ForceExitRaftStore() {
511
partitionList := d.DataPartitionList()
512
for _, partitionID := range partitionList {
513
partition := d.GetDataPartition(partitionID)
514
partition.partitionStatus = proto.Unavailable
520
func (d *Disk) DataPartitionList() (partitionIDs []uint64) {
523
partitionIDs = make([]uint64, 0, len(d.partitionMap))
524
for _, dp := range d.partitionMap {
525
partitionIDs = append(partitionIDs, dp.partitionID)
530
func unmarshalPartitionName(name string) (partitionID uint64, partitionSize int, err error) {
531
arr := strings.Split(name, "_")
533
err = fmt.Errorf("error DataPartition name(%v)", name)
536
if partitionID, err = strconv.ParseUint(arr[1], 10, 64); err != nil {
539
if partitionSize, err = strconv.Atoi(arr[2]); err != nil {
545
func (d *Disk) isPartitionDir(filename string) (isPartitionDir bool) {
546
isPartitionDir = RegexpDataPartitionDir.MatchString(filename) ||
547
RegexpCachePartitionDir.MatchString(filename) ||
548
RegexpPreLoadPartitionDir.MatchString(filename)
552
func (d *Disk) isExpiredPartitionDir(filename string) (isExpiredPartitionDir bool) {
553
isExpiredPartitionDir = RegexpExpiredDataPartitionDir.MatchString(filename)
558
func (d *Disk) RestorePartition(visitor PartitionVisitor) (err error) {
559
convert := func(node *proto.DataNodeInfo) *DataNodeInfo {
560
result := &DataNodeInfo{}
561
result.Addr = node.Addr
562
result.PersistenceDataPartitions = node.PersistenceDataPartitions
565
var dataNode *proto.DataNodeInfo
566
for i := 0; i < 3; i++ {
567
dataNode, err = MasterClient.NodeAPI().GetDataNode(d.space.dataNode.localServerAddr)
569
log.LogErrorf("action[RestorePartition]: getDataNode error %v", err)
574
dinfo := convert(dataNode)
575
if len(dinfo.PersistenceDataPartitions) == 0 {
576
log.LogWarnf("action[RestorePartition]: length of PersistenceDataPartitions is 0, ExpiredPartition check " +
585
fileInfoList, err := os.ReadDir(d.Path)
587
log.LogErrorf("action[RestorePartition] read dir(%v) err(%v).", d.Path, err)
593
toDeleteExpiredPartitionNames = make([]string, 0)
595
for _, fileInfo := range fileInfoList {
596
filename := fileInfo.Name()
597
if !d.isPartitionDir(filename) {
598
if d.isExpiredPartitionDir(filename) {
599
name := path.Join(d.Path, filename)
600
toDeleteExpiredPartitionNames = append(toDeleteExpiredPartitionNames, name)
601
log.LogInfof("action[RestorePartition] find expired partition on path(%s)", name)
606
if partitionID, partitionSize, err = unmarshalPartitionName(filename); err != nil {
607
log.LogErrorf("action[RestorePartition] unmarshal partitionName(%v) from disk(%v) err(%v) ",
608
filename, d.Path, err.Error())
611
log.LogDebugf("acton[RestorePartition] disk(%v) path(%v) PartitionID(%v) partitionSize(%v).",
612
d.Path, fileInfo.Name(), partitionID, partitionSize)
614
if isExpiredPartition(partitionID, dinfo.PersistenceDataPartitions) {
615
log.LogErrorf("action[RestorePartition]: find expired partition[%s], rename it and you can delete it "+
616
"manually", filename)
617
oldName := path.Join(d.Path, filename)
618
newName := path.Join(d.Path, ExpiredPartitionPrefix+filename)
619
os.Rename(oldName, newName)
620
toDeleteExpiredPartitionNames = append(toDeleteExpiredPartitionNames, newName)
626
go func(partitionID uint64, filename string) {
632
if dp, err = LoadDataPartition(path.Join(d.Path, filename), d); err != nil {
633
mesg := fmt.Sprintf("action[RestorePartition] new partition(%v) err(%v) ",
634
partitionID, err.Error())
636
exporter.Warning(mesg)
643
}(partitionID, filename)
646
if len(toDeleteExpiredPartitionNames) > 0 {
647
log.LogInfof("action[RestorePartition] expiredPartitions %v, disk %v", toDeleteExpiredPartitionNames, d.Path)
649
notDeletedExpiredPartitionNames := d.deleteExpiredPartitions(toDeleteExpiredPartitionNames)
651
if len(notDeletedExpiredPartitionNames) > 0 {
652
go func(toDeleteExpiredPartitions []string) {
653
ticker := time.NewTicker(ExpiredPartitionExistTime)
654
log.LogInfof("action[RestorePartition] delete expiredPartitions automatically start, toDeleteExpiredPartitions %v", toDeleteExpiredPartitions)
657
d.deleteExpiredPartitions(toDeleteExpiredPartitionNames)
659
log.LogInfof("action[RestorePartition] delete expiredPartitions automatically finish")
660
}(notDeletedExpiredPartitionNames)
667
func (d *Disk) deleteExpiredPartitions(toDeleteExpiredPartitionNames []string) (notDeletedExpiredPartitionNames []string) {
668
notDeletedExpiredPartitionNames = make([]string, 0)
669
for _, partitionName := range toDeleteExpiredPartitionNames {
670
dirName, fileName := path.Split(partitionName)
671
if !d.isExpiredPartitionDir(fileName) {
672
log.LogInfof("action[deleteExpiredPartitions] partition %v on %v is not expiredPartition", fileName, dirName)
675
dirInfo, err := os.Stat(partitionName)
677
log.LogErrorf("action[deleteExpiredPartitions] stat expiredPartition %v fail, err(%v)", partitionName, err)
680
dirStat := dirInfo.Sys().(*syscall.Stat_t)
681
nowTime := time.Now().Unix()
682
expiredTime := dirStat.Ctim.Sec
683
if nowTime-expiredTime >= int64(ExpiredPartitionExistTime.Seconds()) {
684
err := os.RemoveAll(partitionName)
686
log.LogErrorf("action[deleteExpiredPartitions] delete expiredPartition %v automatically fail, err(%v)", partitionName, err)
689
log.LogInfof("action[deleteExpiredPartitions] delete expiredPartition %v automatically", partitionName)
691
notDeletedExpiredPartitionNames = append(notDeletedExpiredPartitionNames, partitionName)
697
func (d *Disk) AddSize(size uint64) {
698
atomic.AddUint64(&d.Allocated, size)
701
func (d *Disk) updateDisk(allocSize uint64) {
705
if d.Available < allocSize {
706
d.Status = proto.ReadOnly
710
d.Available = d.Available - allocSize
713
func (d *Disk) getSelectWeight() float64 {
714
return float64(atomic.LoadUint64(&d.Allocated)) / float64(d.Total)
717
func (d *Disk) AddDiskErrPartition(dpId uint64) {
718
if _, ok := d.DiskErrPartitionSet[dpId]; !ok {
719
d.DiskErrPartitionSet[dpId] = struct{}{}
723
func (d *Disk) GetDiskErrPartitionList() (diskErrPartitionList []uint64) {
724
diskErrPartitionList = make([]uint64, 0)
725
for k := range d.DiskErrPartitionSet {
726
diskErrPartitionList = append(diskErrPartitionList, k)
728
return diskErrPartitionList
731
func (d *Disk) GetDiskErrPartitionCount() uint64 {
732
return uint64(len(d.DiskErrPartitionSet))
737
func isExpiredPartition(id uint64, partitions []uint64) bool {
738
if len(partitions) == 0 {
742
for _, existId := range partitions {