cubefs

Форк
0
/
space_manager.go 
498 строк · 13.9 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package datanode
16

17
import (
18
	"fmt"
19
	"math"
20
	"os"
21
	"sync"
22
	"time"
23

24
	"github.com/cubefs/cubefs/proto"
25
	"github.com/cubefs/cubefs/raftstore"
26
	"github.com/cubefs/cubefs/util/atomicutil"
27
	"github.com/cubefs/cubefs/util/loadutil"
28
	"github.com/cubefs/cubefs/util/log"
29
	"github.com/shirou/gopsutil/disk"
30
)
31

32
// SpaceManager manages the disk space.
33
type SpaceManager struct {
34
	clusterID            string
35
	disks                map[string]*Disk
36
	partitions           map[uint64]*DataPartition
37
	raftStore            raftstore.RaftStore
38
	nodeID               uint64
39
	diskMutex            sync.RWMutex
40
	partitionMutex       sync.RWMutex
41
	stats                *Stats
42
	stopC                chan bool
43
	selectedIndex        int // TODO what is selected index
44
	diskList             []string
45
	dataNode             *DataNode
46
	createPartitionMutex sync.RWMutex
47
	diskUtils            map[string]*atomicutil.Float64
48
	samplerDone          chan struct{}
49
}
50

51
const diskSampleDuration = 1 * time.Second
52

53
// NewSpaceManager creates a new space manager.
54
func NewSpaceManager(dataNode *DataNode) *SpaceManager {
55
	space := &SpaceManager{}
56
	space.disks = make(map[string]*Disk)
57
	space.diskList = make([]string, 0)
58
	space.partitions = make(map[uint64]*DataPartition)
59
	space.stats = NewStats(dataNode.zoneName)
60
	space.stopC = make(chan bool)
61
	space.dataNode = dataNode
62
	space.diskUtils = make(map[string]*atomicutil.Float64)
63
	go space.statUpdateScheduler()
64

65
	return space
66
}
67

68
func (manager *SpaceManager) Stop() {
69
	defer func() {
70
		recover()
71
	}()
72
	close(manager.stopC)
73
	// stop sampler
74
	close(manager.samplerDone)
75
	// Parallel stop data partitions.
76
	const maxParallelism = 128
77
	parallelism := int(math.Min(float64(maxParallelism), float64(len(manager.partitions))))
78
	wg := sync.WaitGroup{}
79
	partitionC := make(chan *DataPartition, parallelism)
80
	wg.Add(1)
81

82
	// Close raft store.
83
	for _, partition := range manager.partitions {
84
		partition.stopRaft()
85
	}
86

87
	go func(c chan<- *DataPartition) {
88
		defer wg.Done()
89
		for _, partition := range manager.partitions {
90
			c <- partition
91
		}
92
		close(c)
93
	}(partitionC)
94

95
	for i := 0; i < parallelism; i++ {
96
		wg.Add(1)
97
		go func(c <-chan *DataPartition) {
98
			defer wg.Done()
99
			var partition *DataPartition
100
			for {
101
				if partition = <-c; partition == nil {
102
					return
103
				}
104
				partition.Stop()
105
			}
106
		}(partitionC)
107
	}
108
	wg.Wait()
109
}
110

111
func (manager *SpaceManager) GetAllDiskPartitions() []*disk.PartitionStat {
112
	manager.diskMutex.RLock()
113
	defer manager.diskMutex.RUnlock()
114
	partitions := make([]*disk.PartitionStat, 0, len(manager.disks))
115
	for _, disk := range manager.disks {
116
		partition := disk.GetDiskPartition()
117
		if partition != nil {
118
			partitions = append(partitions, partition)
119
		}
120
	}
121
	return partitions
122
}
123

124
func (manager *SpaceManager) FillIoUtils(samples map[string]loadutil.DiskIoSample) {
125
	manager.diskMutex.RLock()
126
	defer manager.diskMutex.RUnlock()
127
	for _, sample := range samples {
128
		util := manager.diskUtils[sample.GetPartition().Device]
129
		if util != nil {
130
			util.Store(sample.GetIoUtilPercent())
131
		}
132
	}
133
}
134

135
func (manager *SpaceManager) StartDiskSample() {
136
	manager.samplerDone = make(chan struct{})
137
	go func() {
138
		for {
139
			select {
140
			case <-manager.samplerDone:
141
				return
142
			default:
143
				partitions := manager.GetAllDiskPartitions()
144
				samples, err := loadutil.GetDisksIoSample(partitions, diskSampleDuration)
145
				if err != nil {
146
					log.LogErrorf("failed to sample disk %v\n", err.Error())
147
					return
148
				}
149
				manager.FillIoUtils(samples)
150
			}
151
		}
152
	}()
153
}
154

155
func (manager *SpaceManager) GetDiskUtils() map[string]float64 {
156
	utils := make(map[string]float64)
157
	manager.diskMutex.RLock()
158
	defer manager.diskMutex.RUnlock()
159
	for device, used := range manager.diskUtils {
160
		utils[device] = used.Load()
161
	}
162
	return utils
163
}
164

165
func (manager *SpaceManager) SetNodeID(nodeID uint64) {
166
	manager.nodeID = nodeID
167
}
168

169
func (manager *SpaceManager) GetNodeID() (nodeID uint64) {
170
	return manager.nodeID
171
}
172

173
func (manager *SpaceManager) SetClusterID(clusterID string) {
174
	manager.clusterID = clusterID
175
}
176

177
func (manager *SpaceManager) GetClusterID() (clusterID string) {
178
	return manager.clusterID
179
}
180

181
func (manager *SpaceManager) SetRaftStore(raftStore raftstore.RaftStore) {
182
	manager.raftStore = raftStore
183
}
184

185
func (manager *SpaceManager) GetRaftStore() (raftStore raftstore.RaftStore) {
186
	return manager.raftStore
187
}
188

189
func (manager *SpaceManager) RangePartitions(f func(partition *DataPartition) bool) {
190
	if f == nil {
191
		return
192
	}
193
	manager.partitionMutex.RLock()
194
	partitions := make([]*DataPartition, 0)
195
	for _, dp := range manager.partitions {
196
		partitions = append(partitions, dp)
197
	}
198
	manager.partitionMutex.RUnlock()
199

200
	for _, partition := range partitions {
201
		if !f(partition) {
202
			break
203
		}
204
	}
205
}
206

207
func (manager *SpaceManager) GetDisks() (disks []*Disk) {
208
	manager.diskMutex.RLock()
209
	defer manager.diskMutex.RUnlock()
210
	disks = make([]*Disk, 0)
211
	for _, disk := range manager.disks {
212
		disks = append(disks, disk)
213
	}
214
	return
215
}
216

217
func (manager *SpaceManager) Stats() *Stats {
218
	return manager.stats
219
}
220

221
func (manager *SpaceManager) LoadDisk(path string, reservedSpace, diskRdonlySpace uint64, maxErrCnt int) (err error) {
222
	var (
223
		disk    *Disk
224
		visitor PartitionVisitor
225
	)
226

227
	if diskRdonlySpace < reservedSpace {
228
		diskRdonlySpace = reservedSpace
229
	}
230

231
	log.LogDebugf("action[LoadDisk] load disk from path(%v).", path)
232
	visitor = func(dp *DataPartition) {
233
		manager.partitionMutex.Lock()
234
		defer manager.partitionMutex.Unlock()
235
		if _, has := manager.partitions[dp.partitionID]; !has {
236
			manager.partitions[dp.partitionID] = dp
237
			log.LogDebugf("action[LoadDisk] put partition(%v) to manager manager.", dp.partitionID)
238
		}
239
	}
240

241
	if _, err = manager.GetDisk(path); err != nil {
242
		disk, err = NewDisk(path, reservedSpace, diskRdonlySpace, maxErrCnt, manager)
243
		if err != nil {
244
			log.LogErrorf("NewDisk fail err:[%v]", err)
245
			return
246
		}
247
		err = disk.RestorePartition(visitor)
248
		if err != nil {
249
			log.LogErrorf("RestorePartition fail err:[%v]", err)
250
			return
251
		}
252
		manager.putDisk(disk)
253
		err = nil
254
		go disk.doBackendTask()
255
	}
256
	return
257
}
258

259
func (manager *SpaceManager) GetDisk(path string) (d *Disk, err error) {
260
	manager.diskMutex.RLock()
261
	defer manager.diskMutex.RUnlock()
262
	disk, has := manager.disks[path]
263
	if has && disk != nil {
264
		d = disk
265
		return
266
	}
267
	err = fmt.Errorf("disk(%v) not exsit", path)
268
	return
269
}
270

271
func (manager *SpaceManager) putDisk(d *Disk) {
272
	manager.diskMutex.Lock()
273
	manager.disks[d.Path] = d
274
	manager.diskList = append(manager.diskList, d.Path)
275
	if d.GetDiskPartition() != nil {
276
		manager.diskUtils[d.GetDiskPartition().Device] = &atomicutil.Float64{}
277
		manager.diskUtils[d.GetDiskPartition().Device].Store(0)
278
	}
279
	manager.diskMutex.Unlock()
280
}
281

282
func (manager *SpaceManager) updateMetrics() {
283
	manager.diskMutex.RLock()
284
	var (
285
		total, used, available                                 uint64
286
		totalPartitionSize, remainingCapacityToCreatePartition uint64
287
		maxCapacityToCreatePartition, partitionCnt             uint64
288
	)
289
	maxCapacityToCreatePartition = 0
290
	for _, d := range manager.disks {
291
		if d.Status == proto.Unavailable {
292
			log.LogInfof("disk is broken, not stat disk useage, diskpath %s", d.Path)
293
			continue
294
		}
295

296
		total += d.Total
297
		used += d.Used
298
		available += d.Available
299
		totalPartitionSize += d.Allocated
300
		remainingCapacityToCreatePartition += d.Unallocated
301
		partitionCnt += uint64(d.PartitionCount())
302
		if maxCapacityToCreatePartition < d.Unallocated {
303
			maxCapacityToCreatePartition = d.Unallocated
304
		}
305
	}
306
	manager.diskMutex.RUnlock()
307
	log.LogDebugf("action[updateMetrics] total(%v) used(%v) available(%v) totalPartitionSize(%v)  remainingCapacityToCreatePartition(%v) "+
308
		"partitionCnt(%v) maxCapacityToCreatePartition(%v) ", total, used, available, totalPartitionSize, remainingCapacityToCreatePartition, partitionCnt, maxCapacityToCreatePartition)
309
	manager.stats.updateMetrics(total, used, available, totalPartitionSize,
310
		remainingCapacityToCreatePartition, maxCapacityToCreatePartition, partitionCnt)
311
}
312

313
func (manager *SpaceManager) minPartitionCnt(decommissionedDisks []string) (d *Disk) {
314
	manager.diskMutex.Lock()
315
	defer manager.diskMutex.Unlock()
316
	var (
317
		minWeight     float64
318
		minWeightDisk *Disk
319
	)
320
	decommissionedDiskMap := make(map[string]struct{})
321
	for _, disk := range decommissionedDisks {
322
		decommissionedDiskMap[disk] = struct{}{}
323
	}
324
	minWeight = math.MaxFloat64
325
	for _, disk := range manager.disks {
326
		if _, ok := decommissionedDiskMap[disk.Path]; ok {
327
			log.LogInfof("action[minPartitionCnt] exclude decommissioned disk[%v]", disk.Path)
328
			continue
329
		}
330
		if disk.Status != proto.ReadWrite {
331
			continue
332
		}
333
		diskWeight := disk.getSelectWeight()
334
		if diskWeight < minWeight {
335
			minWeight = diskWeight
336
			minWeightDisk = disk
337
		}
338
	}
339
	if minWeightDisk == nil {
340
		return
341
	}
342
	if minWeightDisk.Status != proto.ReadWrite {
343
		return
344
	}
345
	d = minWeightDisk
346
	return d
347
}
348

349
func (manager *SpaceManager) statUpdateScheduler() {
350
	go func() {
351
		ticker := time.NewTicker(10 * time.Second)
352
		for {
353
			select {
354
			case <-ticker.C:
355
				manager.updateMetrics()
356
			case <-manager.stopC:
357
				ticker.Stop()
358
				return
359
			}
360
		}
361
	}()
362
}
363

364
func (manager *SpaceManager) Partition(partitionID uint64) (dp *DataPartition) {
365
	manager.partitionMutex.RLock()
366
	defer manager.partitionMutex.RUnlock()
367
	dp = manager.partitions[partitionID]
368
	return
369
}
370

371
func (manager *SpaceManager) AttachPartition(dp *DataPartition) {
372
	manager.partitionMutex.Lock()
373
	defer manager.partitionMutex.Unlock()
374
	manager.partitions[dp.partitionID] = dp
375
}
376

377
// DetachDataPartition removes a data partition from the partition map.
378
func (manager *SpaceManager) DetachDataPartition(partitionID uint64) {
379
	manager.partitionMutex.Lock()
380
	defer manager.partitionMutex.Unlock()
381
	delete(manager.partitions, partitionID)
382
}
383

384
func (manager *SpaceManager) CreatePartition(request *proto.CreateDataPartitionRequest) (dp *DataPartition, err error) {
385
	manager.partitionMutex.Lock()
386
	defer manager.partitionMutex.Unlock()
387
	dpCfg := &dataPartitionCfg{
388
		PartitionID:   request.PartitionId,
389
		VolName:       request.VolumeId,
390
		Peers:         request.Members,
391
		Hosts:         request.Hosts,
392
		RaftStore:     manager.raftStore,
393
		NodeID:        manager.nodeID,
394
		ClusterID:     manager.clusterID,
395
		PartitionSize: request.PartitionSize,
396
		PartitionType: int(request.PartitionTyp),
397
		ReplicaNum:    request.ReplicaNum,
398
		VerSeq:        request.VerSeq,
399
		CreateType:    request.CreateType,
400
		Forbidden:     false,
401
	}
402
	log.LogInfof("action[CreatePartition] dp %v dpCfg.Peers %v request.Members %v",
403
		dpCfg.PartitionID, dpCfg.Peers, request.Members)
404
	dp = manager.partitions[dpCfg.PartitionID]
405
	if dp != nil {
406
		if err = dp.IsEquareCreateDataPartitionRequst(request); err != nil {
407
			return nil, err
408
		}
409
		return
410
	}
411
	disk := manager.minPartitionCnt(request.DecommissionedDisks)
412
	if disk == nil {
413
		return nil, ErrNoSpaceToCreatePartition
414
	}
415
	if dp, err = CreateDataPartition(dpCfg, disk, request); err != nil {
416
		return
417
	}
418
	manager.partitions[dp.partitionID] = dp
419
	return
420
}
421

422
// DeletePartition deletes a partition based on the partition id.
423
func (manager *SpaceManager) DeletePartition(dpID uint64) {
424
	manager.partitionMutex.Lock()
425

426
	dp := manager.partitions[dpID]
427
	if dp == nil {
428
		manager.partitionMutex.Unlock()
429
		return
430
	}
431

432
	delete(manager.partitions, dpID)
433
	manager.partitionMutex.Unlock()
434
	dp.Stop()
435
	dp.Disk().DetachDataPartition(dp)
436
	os.RemoveAll(dp.Path())
437
}
438

439
func (s *DataNode) buildHeartBeatResponse(response *proto.DataNodeHeartbeatResponse) {
440
	response.Status = proto.TaskSucceeds
441
	stat := s.space.Stats()
442
	stat.Lock()
443
	response.Used = stat.Used
444
	response.Total = stat.Total
445
	response.Available = stat.Available
446
	response.CreatedPartitionCnt = uint32(stat.CreatedPartitionCnt)
447
	response.TotalPartitionSize = stat.TotalPartitionSize
448
	response.MaxCapacity = stat.MaxCapacityToCreatePartition
449
	response.RemainingCapacity = stat.RemainingCapacityToCreatePartition
450
	response.BadDisks = make([]string, 0)
451
	response.BadDiskStats = make([]proto.BadDiskStat, 0)
452
	response.StartTime = s.startTime
453
	stat.Unlock()
454

455
	response.ZoneName = s.zoneName
456
	response.PartitionReports = make([]*proto.DataPartitionReport, 0)
457
	space := s.space
458
	space.RangePartitions(func(partition *DataPartition) bool {
459
		leaderAddr, isLeader := partition.IsRaftLeader()
460
		vr := &proto.DataPartitionReport{
461
			VolName:                    partition.volumeID,
462
			PartitionID:                uint64(partition.partitionID),
463
			PartitionStatus:            partition.Status(),
464
			Total:                      uint64(partition.Size()),
465
			Used:                       uint64(partition.Used()),
466
			DiskPath:                   partition.Disk().Path,
467
			IsLeader:                   isLeader,
468
			ExtentCount:                partition.GetExtentCount(),
469
			NeedCompare:                true,
470
			DecommissionRepairProgress: partition.decommissionRepairProgress,
471
		}
472
		log.LogDebugf("action[Heartbeats] dpid(%v), status(%v) total(%v) used(%v) leader(%v) isLeader(%v).", vr.PartitionID, vr.PartitionStatus, vr.Total, vr.Used, leaderAddr, vr.IsLeader)
473
		response.PartitionReports = append(response.PartitionReports, vr)
474
		return true
475
	})
476

477
	disks := space.GetDisks()
478
	for _, d := range disks {
479
		if d.Status == proto.Unavailable {
480
			response.BadDisks = append(response.BadDisks, d.Path)
481

482
			bds := proto.BadDiskStat{
483
				DiskPath:             d.Path,
484
				TotalPartitionCnt:    d.PartitionCount(),
485
				DiskErrPartitionList: d.GetDiskErrPartitionList(),
486
			}
487
			response.BadDiskStats = append(response.BadDiskStats, bds)
488
		}
489
	}
490
}
491

492
func (manager *SpaceManager) getPartitionIds() []uint64 {
493
	res := make([]uint64, 0)
494
	for id := range manager.partitions {
495
		res = append(res, id)
496
	}
497
	return res
498
}
499

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.