cubefs

Форк
0
/
disk.go 
748 строк · 21.0 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package datanode
16

17
import (
18
	"context"
19
	"fmt"
20
	syslog "log"
21
	"os"
22
	"path"
23
	"regexp"
24
	"strconv"
25
	"strings"
26
	"sync"
27
	"sync/atomic"
28
	"syscall"
29
	"time"
30

31
	"golang.org/x/time/rate"
32

33
	"github.com/cubefs/cubefs/proto"
34
	"github.com/cubefs/cubefs/util/exporter"
35
	"github.com/cubefs/cubefs/util/loadutil"
36
	"github.com/cubefs/cubefs/util/log"
37
	"github.com/shirou/gopsutil/disk"
38
)
39

40
var (
41
	// RegexpDataPartitionDir validates the directory name of a data partition.
42
	RegexpDataPartitionDir, _        = regexp.Compile(`^datapartition_(\d)+_(\d)+$`)
43
	RegexpCachePartitionDir, _       = regexp.Compile(`^cachepartition_(\d)+_(\d)+$`)
44
	RegexpPreLoadPartitionDir, _     = regexp.Compile(`^preloadpartition_(\d)+_(\d)+$`)
45
	RegexpExpiredDataPartitionDir, _ = regexp.Compile(`^expired_datapartition_(\d)+_(\d)+$`)
46
)
47

48
const (
49
	ExpiredPartitionPrefix    = "expired_"
50
	ExpiredPartitionExistTime = time.Hour * time.Duration(24*7)
51
)
52

53
const (
54
	DecommissionDiskMark = "decommissionDiskMark"
55
)
56

57
// Disk represents the structure of the disk
58
type Disk struct {
59
	sync.RWMutex
60
	Path        string
61
	ReadErrCnt  uint64 // number of read errors
62
	WriteErrCnt uint64 // number of write errors
63

64
	Total       uint64
65
	Used        uint64
66
	Available   uint64
67
	Unallocated uint64
68
	Allocated   uint64
69

70
	MaxErrCnt       int // maximum number of errors
71
	Status          int // disk status such as READONLY
72
	ReservedSpace   uint64
73
	DiskRdonlySpace uint64
74

75
	RejectWrite                               bool
76
	partitionMap                              map[uint64]*DataPartition
77
	syncTinyDeleteRecordFromLeaderOnEveryDisk chan bool
78
	space                                     *SpaceManager
79
	dataNode                                  *DataNode
80

81
	limitFactor map[uint32]*rate.Limiter
82
	limitRead   *ioLimiter
83
	limitWrite  *ioLimiter
84

85
	// diskPartition info
86
	diskPartition       *disk.PartitionStat
87
	DiskErrPartitionSet map[uint64]struct{}
88
	decommission        bool
89
}
90

91
const (
92
	SyncTinyDeleteRecordFromLeaderOnEveryDisk = 5
93
)
94

95
type PartitionVisitor func(dp *DataPartition)
96

97
func NewDisk(path string, reservedSpace, diskRdonlySpace uint64, maxErrCnt int, space *SpaceManager) (d *Disk, err error) {
98
	d = new(Disk)
99
	d.Path = path
100
	d.ReservedSpace = reservedSpace
101
	d.DiskRdonlySpace = diskRdonlySpace
102
	d.MaxErrCnt = maxErrCnt
103
	d.RejectWrite = false
104
	d.space = space
105
	d.dataNode = space.dataNode
106
	d.partitionMap = make(map[uint64]*DataPartition)
107
	d.syncTinyDeleteRecordFromLeaderOnEveryDisk = make(chan bool, SyncTinyDeleteRecordFromLeaderOnEveryDisk)
108
	err = d.computeUsage()
109
	if err != nil {
110
		return nil, err
111
	}
112
	err = d.updateSpaceInfo()
113
	if err != nil {
114
		return nil, err
115
	}
116
	// get disk partition info
117
	d.diskPartition, err = loadutil.GetMatchParation(d.Path)
118
	if err != nil {
119
		// log but let execution continue
120
		log.LogErrorf("get partition info error, path is %v error message %v", d.Path, err.Error())
121
		err = nil
122
	}
123
	d.startScheduleToUpdateSpaceInfo()
124

125
	d.limitFactor = make(map[uint32]*rate.Limiter, 0)
126
	d.limitFactor[proto.FlowReadType] = rate.NewLimiter(rate.Limit(proto.QosDefaultDiskMaxFLowLimit), proto.QosDefaultBurst)
127
	d.limitFactor[proto.FlowWriteType] = rate.NewLimiter(rate.Limit(proto.QosDefaultDiskMaxFLowLimit), proto.QosDefaultBurst)
128
	d.limitFactor[proto.IopsReadType] = rate.NewLimiter(rate.Limit(proto.QosDefaultDiskMaxIoLimit), defaultIOLimitBurst)
129
	d.limitFactor[proto.IopsWriteType] = rate.NewLimiter(rate.Limit(proto.QosDefaultDiskMaxIoLimit), defaultIOLimitBurst)
130
	d.limitRead = newIOLimiter(space.dataNode.diskReadFlow, space.dataNode.diskReadIocc)
131
	d.limitWrite = newIOLimiter(space.dataNode.diskWriteFlow, space.dataNode.diskWriteIocc)
132

133
	d.DiskErrPartitionSet = make(map[uint64]struct{}, 0)
134

135
	err = d.initDecommissionStatus()
136
	if err != nil {
137
		log.LogErrorf("action[NewDisk]: failed to load disk decommission status")
138
		// NOTE: continue execution
139
		err = nil
140
	}
141
	return
142
}
143

144
func (d *Disk) MarkDecommissionStatus(decommission bool) {
145
	probePath := path.Join(d.Path, DecommissionDiskMark)
146
	var err error
147
	defer func() {
148
		if err != nil {
149
			log.LogErrorf("action[MarkDecommissionStatus]: %v", err)
150
			return
151
		}
152
	}()
153
	if decommission {
154
		file, err := os.Create(probePath)
155
		if err == nil {
156
			file.Close()
157
		}
158
	} else {
159
		err = os.Remove(probePath)
160
		if os.IsNotExist(err) {
161
			err = nil
162
		}
163
	}
164
	d.decommission = decommission
165
}
166

167
func (d *Disk) GetDecommissionStatus() bool {
168
	return d.decommission
169
}
170

171
func (d *Disk) initDecommissionStatus() error {
172
	probePath := path.Join(d.Path, DecommissionDiskMark)
173
	_, err := os.Stat(probePath)
174
	if err == nil {
175
		d.decommission = true
176
		return nil
177
	}
178
	if os.IsNotExist(err) {
179
		return nil
180
	}
181
	return err
182
}
183

184
func (d *Disk) GetDiskPartition() *disk.PartitionStat {
185
	return d.diskPartition
186
}
187

188
func (d *Disk) updateQosLimiter() {
189
	if d.dataNode.diskReadFlow > 0 {
190
		d.limitFactor[proto.FlowReadType].SetLimit(rate.Limit(d.dataNode.diskReadFlow))
191
	}
192
	if d.dataNode.diskWriteFlow > 0 {
193
		d.limitFactor[proto.FlowWriteType].SetLimit(rate.Limit(d.dataNode.diskWriteFlow))
194
	}
195
	if d.dataNode.diskReadIops > 0 {
196
		d.limitFactor[proto.IopsReadType].SetLimit(rate.Limit(d.dataNode.diskReadIops))
197
	}
198
	if d.dataNode.diskWriteIops > 0 {
199
		d.limitFactor[proto.IopsWriteType].SetLimit(rate.Limit(d.dataNode.diskWriteIops))
200
	}
201
	for i := proto.IopsReadType; i < proto.FlowWriteType; i++ {
202
		log.LogInfof("action[updateQosLimiter] type %v limit %v", proto.QosTypeString(i), d.limitFactor[i].Limit())
203
	}
204
	log.LogInfof("action[updateQosLimiter] read(iocc:%d iops:%d flow:%d) write(iocc:%d iops:%d flow:%d)",
205
		d.dataNode.diskReadIocc, d.dataNode.diskReadIops, d.dataNode.diskReadFlow,
206
		d.dataNode.diskWriteIocc, d.dataNode.diskWriteIops, d.dataNode.diskWriteFlow)
207
	d.limitRead.ResetIO(d.dataNode.diskReadIocc)
208
	d.limitRead.ResetFlow(d.dataNode.diskReadFlow)
209
	d.limitWrite.ResetIO(d.dataNode.diskWriteIocc)
210
	d.limitWrite.ResetFlow(d.dataNode.diskWriteFlow)
211
}
212

213
func (d *Disk) allocCheckLimit(factorType uint32, used uint32) error {
214
	if !(d.dataNode.diskQosEnableFromMaster && d.dataNode.diskQosEnable) {
215
		return nil
216
	}
217

218
	ctx := context.Background()
219
	d.limitFactor[factorType].WaitN(ctx, int(used))
220
	return nil
221
}
222

223
// PartitionCount returns the number of partitions in the partition map.
224
func (d *Disk) PartitionCount() int {
225
	d.RLock()
226
	defer d.RUnlock()
227
	return len(d.partitionMap)
228
}
229

230
func (d *Disk) CanWrite() bool {
231
	if d.Status == proto.ReadWrite || !d.RejectWrite {
232
		return true
233
	}
234

235
	// if ReservedSpace < diskFreeSpace < DiskRdonlySpace, writeOp is ok, disk & dp is rdonly, can't create dp again
236
	// if ReservedSpace > diskFreeSpace, writeOp is also not allowed.
237
	if d.Total+d.DiskRdonlySpace > d.Used+d.ReservedSpace {
238
		return true
239
	}
240

241
	return false
242
}
243

244
// Compute the disk usage
245
func (d *Disk) computeUsage() (err error) {
246
	d.RLock()
247
	defer d.RUnlock()
248
	fs := syscall.Statfs_t{}
249
	err = syscall.Statfs(d.Path, &fs)
250
	if err != nil {
251
		log.LogErrorf("computeUsage. err %v", err)
252
		return
253
	}
254

255
	repairSize := uint64(d.repairAllocSize())
256

257
	//  total := math.Max(0, int64(fs.Blocks*uint64(fs.Bsize) - d.PreReserveSpace))
258
	total := int64(fs.Blocks*uint64(fs.Bsize) - d.DiskRdonlySpace)
259
	if total < 0 {
260
		total = 0
261
	}
262
	d.Total = uint64(total)
263

264
	//  available := math.Max(0, int64(fs.Bavail*uint64(fs.Bsize) - d.PreReserveSpace))
265
	available := int64(fs.Bavail*uint64(fs.Bsize) - d.DiskRdonlySpace - repairSize)
266
	if available < 0 {
267
		available = 0
268
	}
269
	d.Available = uint64(available)
270

271
	//  used := math.Max(0, int64(total - available))
272
	free := int64(fs.Bfree*uint64(fs.Bsize) - d.DiskRdonlySpace - repairSize)
273

274
	used := int64(total - free)
275
	if used < 0 {
276
		used = 0
277
	}
278
	d.Used = uint64(used)
279

280
	allocatedSize := int64(0)
281
	for _, dp := range d.partitionMap {
282
		allocatedSize += int64(dp.Size())
283
	}
284

285
	log.LogDebugf("computeUsage. fs info [%v,%v,%v,%v] total %v available %v DiskRdonlySpace %v ReservedSpace %v allocatedSize %v",
286
		fs.Blocks, fs.Bsize, fs.Bavail, fs.Bfree, d.Total, d.Available, d.DiskRdonlySpace, d.ReservedSpace, allocatedSize)
287

288
	atomic.StoreUint64(&d.Allocated, uint64(allocatedSize))
289
	//  unallocated = math.Max(0, total - allocatedSize)
290
	unallocated := total - allocatedSize
291
	if unallocated < 0 {
292
		unallocated = 0
293
	}
294
	if d.Available <= 0 {
295
		d.RejectWrite = true
296
	} else {
297
		d.RejectWrite = false
298
	}
299
	d.Unallocated = uint64(unallocated)
300

301
	log.LogDebugf("action[computeUsage] disk(%v) all(%v) available(%v) used(%v)", d.Path, d.Total, d.Available, d.Used)
302

303
	return
304
}
305

306
func (d *Disk) repairAllocSize() int {
307
	allocSize := 0
308
	for _, dp := range d.partitionMap {
309
		if dp.DataPartitionCreateType == proto.NormalCreateDataPartition || dp.leaderSize <= dp.used {
310
			continue
311
		}
312

313
		allocSize += dp.leaderSize - dp.used
314
	}
315

316
	return allocSize
317
}
318

319
func (d *Disk) incReadErrCnt() {
320
	atomic.AddUint64(&d.ReadErrCnt, 1)
321
}
322

323
func (d *Disk) getReadErrCnt() uint64 {
324
	return atomic.LoadUint64(&d.ReadErrCnt)
325
}
326

327
func (d *Disk) incWriteErrCnt() {
328
	atomic.AddUint64(&d.WriteErrCnt, 1)
329
}
330

331
func (d *Disk) getWriteErrCnt() uint64 {
332
	return atomic.LoadUint64(&d.WriteErrCnt)
333
}
334

335
func (d *Disk) getTotalErrCnt() uint64 {
336
	return d.getReadErrCnt() + d.getWriteErrCnt()
337
}
338

339
func (d *Disk) startScheduleToUpdateSpaceInfo() {
340
	go func() {
341
		updateSpaceInfoTicker := time.NewTicker(5 * time.Second)
342
		checkStatusTicker := time.NewTicker(time.Minute * 2)
343
		defer func() {
344
			updateSpaceInfoTicker.Stop()
345
			checkStatusTicker.Stop()
346
		}()
347
		for {
348
			select {
349
			case <-updateSpaceInfoTicker.C:
350
				d.computeUsage()
351
				d.updateSpaceInfo()
352
			case <-checkStatusTicker.C:
353
				d.checkDiskStatus()
354
			}
355
		}
356
	}()
357
}
358

359
func (d *Disk) doBackendTask() {
360
	for {
361
		partitions := make([]*DataPartition, 0)
362
		d.RLock()
363
		for _, dp := range d.partitionMap {
364
			partitions = append(partitions, dp)
365
		}
366
		d.RUnlock()
367
		for _, dp := range partitions {
368
			dp.extentStore.BackendTask()
369
		}
370
		time.Sleep(time.Minute)
371
	}
372
}
373

374
const (
375
	DiskStatusFile = ".diskStatus"
376
)
377

378
func (d *Disk) checkDiskStatus() {
379
	if d.Status == proto.Unavailable {
380
		log.LogInfof("[checkDiskStatus] disk status is unavailable, no need to check, disk path(%v)", d.Path)
381
		return
382
	}
383

384
	path := path.Join(d.Path, DiskStatusFile)
385
	fp, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0o755)
386
	if err != nil {
387
		d.CheckDiskError(err, ReadFlag)
388
		return
389
	}
390
	defer fp.Close()
391
	data := []byte(DiskStatusFile)
392
	_, err = fp.WriteAt(data, 0)
393
	if err != nil {
394
		d.CheckDiskError(err, WriteFlag)
395
		return
396
	}
397
	if err = fp.Sync(); err != nil {
398
		d.CheckDiskError(err, WriteFlag)
399
		return
400
	}
401
	if _, err = fp.ReadAt(data, 0); err != nil {
402
		d.CheckDiskError(err, ReadFlag)
403
		return
404
	}
405
}
406

407
const DiskErrNotAssociatedWithPartition uint64 = 0 // use 0 for disk error without any data partition
408

409
func (d *Disk) CheckDiskError(err error, rwFlag uint8) {
410
	if err == nil {
411
		return
412
	}
413
	log.LogWarnf("CheckDiskError disk err: %v, disk:%v", err.Error(), d.Path)
414

415
	if !IsDiskErr(err.Error()) {
416
		return
417
	}
418

419
	d.triggerDiskError(rwFlag, DiskErrNotAssociatedWithPartition)
420
}
421

422
func (d *Disk) doDiskError() {
423
	d.Status = proto.Unavailable
424
	// d.ForceExitRaftStore()
425
}
426

427
func (d *Disk) triggerDiskError(rwFlag uint8, dpId uint64) {
428
	mesg := fmt.Sprintf("disk path %v error on %v, dpId %v", d.Path, LocalIP, dpId)
429
	exporter.Warning(mesg)
430
	log.LogWarnf(mesg)
431

432
	if rwFlag == WriteFlag {
433
		d.incWriteErrCnt()
434
	} else if rwFlag == ReadFlag {
435
		d.incReadErrCnt()
436
	} else {
437
		d.incWriteErrCnt()
438
		d.incReadErrCnt()
439
	}
440

441
	d.AddDiskErrPartition(dpId)
442

443
	diskErrCnt := d.getTotalErrCnt()
444
	diskErrPartitionCnt := d.GetDiskErrPartitionCount()
445
	if diskErrPartitionCnt >= d.dataNode.diskUnavailablePartitionErrorCount {
446
		msg := fmt.Sprintf("set disk unavailable for too many disk error, "+
447
			"disk path(%v), ip(%v), diskErrCnt(%v), diskErrPartitionCnt(%v) threshold(%v)",
448
			d.Path, LocalIP, diskErrCnt, diskErrPartitionCnt, d.dataNode.diskUnavailablePartitionErrorCount)
449
		exporter.Warning(msg)
450
		log.LogWarnf(msg)
451
		d.doDiskError()
452
	}
453
}
454

455
func (d *Disk) updateSpaceInfo() (err error) {
456
	var statsInfo syscall.Statfs_t
457
	if err = syscall.Statfs(d.Path, &statsInfo); err != nil {
458
		d.incReadErrCnt()
459
	}
460

461
	if d.Status == proto.Unavailable {
462
		mesg := fmt.Sprintf("disk path %v error on %v", d.Path, LocalIP)
463
		log.LogErrorf(mesg)
464
		exporter.Warning(mesg)
465
		// d.ForceExitRaftStore()
466
	} else if d.Available <= 0 {
467
		d.Status = proto.ReadOnly
468
	} else {
469
		d.Status = proto.ReadWrite
470
	}
471

472
	log.LogDebugf("action[updateSpaceInfo] disk(%v) total(%v) available(%v) remain(%v) "+
473
		"restSize(%v) preRestSize (%v) maxErrs(%v) readErrs(%v) writeErrs(%v) status(%v)", d.Path,
474
		d.Total, d.Available, d.Unallocated, d.ReservedSpace, d.DiskRdonlySpace, d.MaxErrCnt, d.ReadErrCnt, d.WriteErrCnt, d.Status)
475
	return
476
}
477

478
// AttachDataPartition adds a data partition to the partition map.
479
func (d *Disk) AttachDataPartition(dp *DataPartition) {
480
	d.Lock()
481
	d.partitionMap[dp.partitionID] = dp
482
	d.Unlock()
483

484
	d.computeUsage()
485
}
486

487
// DetachDataPartition removes a data partition from the partition map.
488
func (d *Disk) DetachDataPartition(dp *DataPartition) {
489
	d.Lock()
490
	delete(d.partitionMap, dp.partitionID)
491
	delete(d.DiskErrPartitionSet, dp.partitionID)
492
	d.Unlock()
493

494
	d.computeUsage()
495
}
496

497
// GetDataPartition returns the data partition based on the given partition ID.
498
func (d *Disk) GetDataPartition(partitionID uint64) (partition *DataPartition) {
499
	d.RLock()
500
	defer d.RUnlock()
501
	return d.partitionMap[partitionID]
502
}
503

504
func (d *Disk) GetDataPartitionCount() int {
505
	d.RLock()
506
	defer d.RUnlock()
507
	return len(d.partitionMap)
508
}
509

510
func (d *Disk) ForceExitRaftStore() {
511
	partitionList := d.DataPartitionList()
512
	for _, partitionID := range partitionList {
513
		partition := d.GetDataPartition(partitionID)
514
		partition.partitionStatus = proto.Unavailable
515
		partition.stopRaft()
516
	}
517
}
518

519
// DataPartitionList returns a list of the data partitions
520
func (d *Disk) DataPartitionList() (partitionIDs []uint64) {
521
	d.Lock()
522
	defer d.Unlock()
523
	partitionIDs = make([]uint64, 0, len(d.partitionMap))
524
	for _, dp := range d.partitionMap {
525
		partitionIDs = append(partitionIDs, dp.partitionID)
526
	}
527
	return
528
}
529

530
func unmarshalPartitionName(name string) (partitionID uint64, partitionSize int, err error) {
531
	arr := strings.Split(name, "_")
532
	if len(arr) != 3 {
533
		err = fmt.Errorf("error DataPartition name(%v)", name)
534
		return
535
	}
536
	if partitionID, err = strconv.ParseUint(arr[1], 10, 64); err != nil {
537
		return
538
	}
539
	if partitionSize, err = strconv.Atoi(arr[2]); err != nil {
540
		return
541
	}
542
	return
543
}
544

545
func (d *Disk) isPartitionDir(filename string) (isPartitionDir bool) {
546
	isPartitionDir = RegexpDataPartitionDir.MatchString(filename) ||
547
		RegexpCachePartitionDir.MatchString(filename) ||
548
		RegexpPreLoadPartitionDir.MatchString(filename)
549
	return
550
}
551

552
func (d *Disk) isExpiredPartitionDir(filename string) (isExpiredPartitionDir bool) {
553
	isExpiredPartitionDir = RegexpExpiredDataPartitionDir.MatchString(filename)
554
	return
555
}
556

557
// RestorePartition reads the files stored on the local disk and restores the data partitions.
558
func (d *Disk) RestorePartition(visitor PartitionVisitor) (err error) {
559
	convert := func(node *proto.DataNodeInfo) *DataNodeInfo {
560
		result := &DataNodeInfo{}
561
		result.Addr = node.Addr
562
		result.PersistenceDataPartitions = node.PersistenceDataPartitions
563
		return result
564
	}
565
	var dataNode *proto.DataNodeInfo
566
	for i := 0; i < 3; i++ {
567
		dataNode, err = MasterClient.NodeAPI().GetDataNode(d.space.dataNode.localServerAddr)
568
		if err != nil {
569
			log.LogErrorf("action[RestorePartition]: getDataNode error %v", err)
570
			continue
571
		}
572
		break
573
	}
574
	dinfo := convert(dataNode)
575
	if len(dinfo.PersistenceDataPartitions) == 0 {
576
		log.LogWarnf("action[RestorePartition]: length of PersistenceDataPartitions is 0, ExpiredPartition check " +
577
			"without effect")
578
	}
579

580
	var (
581
		partitionID   uint64
582
		partitionSize int
583
	)
584

585
	fileInfoList, err := os.ReadDir(d.Path)
586
	if err != nil {
587
		log.LogErrorf("action[RestorePartition] read dir(%v) err(%v).", d.Path, err)
588
		return err
589
	}
590

591
	var (
592
		wg                            sync.WaitGroup
593
		toDeleteExpiredPartitionNames = make([]string, 0)
594
	)
595
	for _, fileInfo := range fileInfoList {
596
		filename := fileInfo.Name()
597
		if !d.isPartitionDir(filename) {
598
			if d.isExpiredPartitionDir(filename) {
599
				name := path.Join(d.Path, filename)
600
				toDeleteExpiredPartitionNames = append(toDeleteExpiredPartitionNames, name)
601
				log.LogInfof("action[RestorePartition] find expired partition on path(%s)", name)
602
			}
603
			continue
604
		}
605

606
		if partitionID, partitionSize, err = unmarshalPartitionName(filename); err != nil {
607
			log.LogErrorf("action[RestorePartition] unmarshal partitionName(%v) from disk(%v) err(%v) ",
608
				filename, d.Path, err.Error())
609
			continue
610
		}
611
		log.LogDebugf("acton[RestorePartition] disk(%v) path(%v) PartitionID(%v) partitionSize(%v).",
612
			d.Path, fileInfo.Name(), partitionID, partitionSize)
613

614
		if isExpiredPartition(partitionID, dinfo.PersistenceDataPartitions) {
615
			log.LogErrorf("action[RestorePartition]: find expired partition[%s], rename it and you can delete it "+
616
				"manually", filename)
617
			oldName := path.Join(d.Path, filename)
618
			newName := path.Join(d.Path, ExpiredPartitionPrefix+filename)
619
			os.Rename(oldName, newName)
620
			toDeleteExpiredPartitionNames = append(toDeleteExpiredPartitionNames, newName)
621
			continue
622
		}
623

624
		wg.Add(1)
625

626
		go func(partitionID uint64, filename string) {
627
			var (
628
				dp  *DataPartition
629
				err error
630
			)
631
			defer wg.Done()
632
			if dp, err = LoadDataPartition(path.Join(d.Path, filename), d); err != nil {
633
				mesg := fmt.Sprintf("action[RestorePartition] new partition(%v) err(%v) ",
634
					partitionID, err.Error())
635
				log.LogError(mesg)
636
				exporter.Warning(mesg)
637
				syslog.Println(mesg)
638
				return
639
			}
640
			if visitor != nil {
641
				visitor(dp)
642
			}
643
		}(partitionID, filename)
644
	}
645

646
	if len(toDeleteExpiredPartitionNames) > 0 {
647
		log.LogInfof("action[RestorePartition] expiredPartitions %v, disk %v", toDeleteExpiredPartitionNames, d.Path)
648

649
		notDeletedExpiredPartitionNames := d.deleteExpiredPartitions(toDeleteExpiredPartitionNames)
650

651
		if len(notDeletedExpiredPartitionNames) > 0 {
652
			go func(toDeleteExpiredPartitions []string) {
653
				ticker := time.NewTicker(ExpiredPartitionExistTime)
654
				log.LogInfof("action[RestorePartition] delete expiredPartitions automatically start, toDeleteExpiredPartitions %v", toDeleteExpiredPartitions)
655

656
				<-ticker.C
657
				d.deleteExpiredPartitions(toDeleteExpiredPartitionNames)
658
				ticker.Stop()
659
				log.LogInfof("action[RestorePartition] delete expiredPartitions automatically finish")
660
			}(notDeletedExpiredPartitionNames)
661
		}
662
	}
663
	wg.Wait()
664
	return err
665
}
666

667
func (d *Disk) deleteExpiredPartitions(toDeleteExpiredPartitionNames []string) (notDeletedExpiredPartitionNames []string) {
668
	notDeletedExpiredPartitionNames = make([]string, 0)
669
	for _, partitionName := range toDeleteExpiredPartitionNames {
670
		dirName, fileName := path.Split(partitionName)
671
		if !d.isExpiredPartitionDir(fileName) {
672
			log.LogInfof("action[deleteExpiredPartitions] partition %v on %v is not expiredPartition", fileName, dirName)
673
			continue
674
		}
675
		dirInfo, err := os.Stat(partitionName)
676
		if err != nil {
677
			log.LogErrorf("action[deleteExpiredPartitions] stat expiredPartition %v fail, err(%v)", partitionName, err)
678
			continue
679
		}
680
		dirStat := dirInfo.Sys().(*syscall.Stat_t)
681
		nowTime := time.Now().Unix()
682
		expiredTime := dirStat.Ctim.Sec
683
		if nowTime-expiredTime >= int64(ExpiredPartitionExistTime.Seconds()) {
684
			err := os.RemoveAll(partitionName)
685
			if err != nil {
686
				log.LogErrorf("action[deleteExpiredPartitions] delete expiredPartition %v automatically fail, err(%v)", partitionName, err)
687
				continue
688
			}
689
			log.LogInfof("action[deleteExpiredPartitions] delete expiredPartition %v automatically", partitionName)
690
		} else {
691
			notDeletedExpiredPartitionNames = append(notDeletedExpiredPartitionNames, partitionName)
692
		}
693
	}
694
	return
695
}
696

697
func (d *Disk) AddSize(size uint64) {
698
	atomic.AddUint64(&d.Allocated, size)
699
}
700

701
func (d *Disk) updateDisk(allocSize uint64) {
702
	d.Lock()
703
	defer d.Unlock()
704

705
	if d.Available < allocSize {
706
		d.Status = proto.ReadOnly
707
		d.Available = 0
708
		return
709
	}
710
	d.Available = d.Available - allocSize
711
}
712

713
func (d *Disk) getSelectWeight() float64 {
714
	return float64(atomic.LoadUint64(&d.Allocated)) / float64(d.Total)
715
}
716

717
func (d *Disk) AddDiskErrPartition(dpId uint64) {
718
	if _, ok := d.DiskErrPartitionSet[dpId]; !ok {
719
		d.DiskErrPartitionSet[dpId] = struct{}{}
720
	}
721
}
722

723
func (d *Disk) GetDiskErrPartitionList() (diskErrPartitionList []uint64) {
724
	diskErrPartitionList = make([]uint64, 0)
725
	for k := range d.DiskErrPartitionSet {
726
		diskErrPartitionList = append(diskErrPartitionList, k)
727
	}
728
	return diskErrPartitionList
729
}
730

731
func (d *Disk) GetDiskErrPartitionCount() uint64 {
732
	return uint64(len(d.DiskErrPartitionSet))
733
}
734

735
// isExpiredPartition return whether one partition is expired
736
// if one partition does not exist in master, we decided that it is one expired partition
737
func isExpiredPartition(id uint64, partitions []uint64) bool {
738
	if len(partitions) == 0 {
739
		return true
740
	}
741

742
	for _, existId := range partitions {
743
		if existId == id {
744
			return false
745
		}
746
	}
747
	return true
748
}
749

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.