cubefs

Форк
0
/
partition.go 
1307 строк · 37.4 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package datanode
16

17
import (
18
	"encoding/json"
19
	"fmt"
20
	"hash/crc32"
21
	"math"
22
	"net"
23
	"os"
24
	"path"
25
	"sort"
26
	"strings"
27
	"sync"
28
	"sync/atomic"
29
	"time"
30

31
	raftProto "github.com/cubefs/cubefs/depends/tiglabs/raft/proto"
32
	"github.com/cubefs/cubefs/proto"
33
	"github.com/cubefs/cubefs/raftstore"
34
	"github.com/cubefs/cubefs/repl"
35
	"github.com/cubefs/cubefs/storage"
36
	"github.com/cubefs/cubefs/util"
37
	"github.com/cubefs/cubefs/util/errors"
38
	"github.com/cubefs/cubefs/util/log"
39
)
40

41
const (
42
	DataPartitionPrefix           = "datapartition"
43
	CachePartitionPrefix          = "cachepartition"
44
	PreLoadPartitionPrefix        = "preloadpartition"
45
	DataPartitionMetadataFileName = "META"
46
	TempMetadataFileName          = ".meta"
47
	ApplyIndexFile                = "APPLY"
48
	TempApplyIndexFile            = ".apply"
49
	TimeLayout                    = "2006-01-02 15:04:05"
50
)
51

52
const (
53
	RaftStatusStopped = 0
54
	RaftStatusRunning = 1
55
)
56

57
type DataPartitionMetadata struct {
58
	VolumeID                string
59
	PartitionID             uint64
60
	PartitionSize           int
61
	PartitionType           int
62
	CreateTime              string
63
	Peers                   []proto.Peer
64
	Hosts                   []string
65
	DataPartitionCreateType int
66
	LastTruncateID          uint64
67
	ReplicaNum              int
68
	StopRecover             bool
69
	VerList                 []*proto.VolVersionInfo
70
	ApplyID                 uint64
71
}
72

73
func (md *DataPartitionMetadata) Validate() (err error) {
74
	md.VolumeID = strings.TrimSpace(md.VolumeID)
75
	if len(md.VolumeID) == 0 || md.PartitionID == 0 || md.PartitionSize == 0 {
76
		err = errors.New("illegal data partition metadata")
77
		return
78
	}
79
	return
80
}
81

82
// MetaMultiSnapshotInfo
83
type MetaMultiSnapshotInfo struct {
84
	VerSeq uint64
85
	Status int8
86
	Ctime  time.Time
87
}
88

89
type DataPartition struct {
90
	clusterID       string
91
	volumeID        string
92
	partitionID     uint64
93
	partitionStatus int
94
	partitionSize   int
95
	partitionType   int
96
	replicaNum      int
97
	replicas        []string // addresses of the replicas
98
	replicasLock    sync.RWMutex
99
	disk            *Disk
100
	dataNode        *DataNode
101
	isLeader        bool
102
	isRaftLeader    bool
103
	path            string
104
	used            int
105
	leaderSize      int
106
	extentStore     *storage.ExtentStore
107
	raftPartition   raftstore.Partition
108
	config          *dataPartitionCfg
109
	appliedID       uint64 // apply id used in Raft
110
	lastTruncateID  uint64 // truncate id used in Raft
111
	metaAppliedID   uint64 // apply id while do meta persist
112
	minAppliedID    uint64
113
	maxAppliedID    uint64
114

115
	stopOnce  sync.Once
116
	stopRaftC chan uint64
117
	storeC    chan uint64
118
	stopC     chan bool
119

120
	raftStatus int32
121

122
	intervalToUpdateReplicas      int64 // interval to ask the master for updating the replica information
123
	snapshot                      []*proto.File
124
	snapshotMutex                 sync.RWMutex
125
	intervalToUpdatePartitionSize int64
126
	loadExtentHeaderStatus        int
127
	DataPartitionCreateType       int
128
	isLoadingDataPartition        int32
129
	persistMetaMutex              sync.RWMutex
130

131
	// snapshot
132
	verSeq                     uint64
133
	verSeqPrepare              uint64
134
	verSeqCommitStatus         int8
135
	volVersionInfoList         *proto.VolVersionInfoList
136
	decommissionRepairProgress float64 // record repair progress for decommission datapartition
137
	stopRecover                bool
138
	recoverErrCnt              uint64 // donot reset, if reach max err cnt, delete this dp
139

140
	diskErrCnt uint64 // number of disk io errors while reading or writing
141
}
142

143
func (dp *DataPartition) IsForbidden() bool {
144
	return dp.config.Forbidden
145
}
146

147
func (dp *DataPartition) SetForbidden(status bool) {
148
	dp.config.Forbidden = status
149
}
150

151
func CreateDataPartition(dpCfg *dataPartitionCfg, disk *Disk, request *proto.CreateDataPartitionRequest) (dp *DataPartition, err error) {
152
	if dp, err = newDataPartition(dpCfg, disk, true); err != nil {
153
		return
154
	}
155
	dp.ForceLoadHeader()
156
	if request.CreateType == proto.NormalCreateDataPartition {
157
		err = dp.StartRaft(false)
158
	} else {
159
		// init leaderSize to partitionSize
160
		disk.updateDisk(uint64(request.LeaderSize))
161
		// ensure heartbeat report  Recovering
162
		dp.partitionStatus = proto.Recovering
163
		go dp.StartRaftAfterRepair(false)
164
	}
165
	if err != nil {
166
		return nil, err
167
	}
168

169
	// persist file metadata
170
	go dp.StartRaftLoggingSchedule()
171
	dp.DataPartitionCreateType = request.CreateType
172
	dp.replicaNum = request.ReplicaNum
173
	err = dp.PersistMetadata()
174
	disk.AddSize(uint64(dp.Size()))
175
	return
176
}
177

178
func (dp *DataPartition) IsEquareCreateDataPartitionRequst(request *proto.CreateDataPartitionRequest) (err error) {
179
	if len(dp.config.Peers) != len(request.Members) {
180
		return fmt.Errorf("exist partition(%v)  peers len(%v) members len(%v)",
181
			dp.partitionID, len(dp.config.Peers), len(request.Members))
182
	}
183
	for index, host := range dp.config.Hosts {
184
		requestHost := request.Hosts[index]
185
		if host != requestHost {
186
			return fmt.Errorf("exist partition(%v) index(%v) requestHost(%v) persistHost(%v)",
187
				dp.partitionID, index, requestHost, host)
188
		}
189
	}
190
	for index, peer := range dp.config.Peers {
191
		requestPeer := request.Members[index]
192
		if requestPeer.ID != peer.ID || requestPeer.Addr != peer.Addr {
193
			return fmt.Errorf("exist partition(%v) index(%v) requestPeer(%v) persistPeers(%v)",
194
				dp.partitionID, index, requestPeer, peer)
195
		}
196
	}
197
	if dp.config.VolName != request.VolumeId {
198
		return fmt.Errorf("exist partition Partition(%v)  requestVolName(%v) persistVolName(%v)",
199
			dp.partitionID, request.VolumeId, dp.config.VolName)
200
	}
201

202
	return
203
}
204

205
func (dp *DataPartition) ForceSetDataPartitionToLoadding() {
206
	atomic.StoreInt32(&dp.isLoadingDataPartition, 1)
207
}
208

209
func (dp *DataPartition) ForceSetDataPartitionToFininshLoad() {
210
	atomic.StoreInt32(&dp.isLoadingDataPartition, 0)
211
}
212

213
func (dp *DataPartition) IsDataPartitionLoading() bool {
214
	return atomic.LoadInt32(&dp.isLoadingDataPartition) == 1
215
}
216

217
func (dp *DataPartition) ForceSetRaftRunning() {
218
	atomic.StoreInt32(&dp.raftStatus, RaftStatusRunning)
219
}
220

221
// LoadDataPartition loads and returns a partition instance based on the specified directory.
222
// It reads the partition metadata file stored under the specified directory
223
// and creates the partition instance.
224
func LoadDataPartition(partitionDir string, disk *Disk) (dp *DataPartition, err error) {
225
	var metaFileData []byte
226
	if metaFileData, err = os.ReadFile(path.Join(partitionDir, DataPartitionMetadataFileName)); err != nil {
227
		return
228
	}
229
	meta := &DataPartitionMetadata{}
230
	if err = json.Unmarshal(metaFileData, meta); err != nil {
231
		return
232
	}
233
	if err = meta.Validate(); err != nil {
234
		return
235
	}
236

237
	dpCfg := &dataPartitionCfg{
238
		VolName:       meta.VolumeID,
239
		PartitionSize: meta.PartitionSize,
240
		PartitionType: meta.PartitionType,
241
		PartitionID:   meta.PartitionID,
242
		ReplicaNum:    meta.ReplicaNum,
243
		Peers:         meta.Peers,
244
		Hosts:         meta.Hosts,
245
		RaftStore:     disk.space.GetRaftStore(),
246
		NodeID:        disk.space.GetNodeID(),
247
		ClusterID:     disk.space.GetClusterID(),
248
	}
249
	if dp, err = newDataPartition(dpCfg, disk, false); err != nil {
250
		return
251
	}
252
	dp.stopRecover = meta.StopRecover
253
	dp.metaAppliedID = meta.ApplyID
254
	dp.computeUsage()
255
	dp.ForceSetDataPartitionToLoadding()
256
	disk.space.AttachPartition(dp)
257
	if err = dp.LoadAppliedID(); err != nil {
258
		log.LogErrorf("action[loadApplyIndex] %v", err)
259
		return
260
	}
261
	log.LogInfof("Action(LoadDataPartition) PartitionID(%v) meta(%v) stopRecover(%v)", dp.partitionID, meta, meta.StopRecover)
262
	dp.DataPartitionCreateType = meta.DataPartitionCreateType
263
	dp.lastTruncateID = meta.LastTruncateID
264
	if meta.DataPartitionCreateType == proto.NormalCreateDataPartition {
265
		err = dp.StartRaft(true)
266
	} else {
267
		// init leaderSize to partitionSize
268
		dp.leaderSize = dp.partitionSize
269
		dp.partitionStatus = proto.Recovering
270
		go dp.StartRaftAfterRepair(true)
271
	}
272
	if err != nil {
273
		log.LogErrorf("PartitionID(%v) start raft err(%v)..", dp.partitionID, err)
274
		disk.space.DetachDataPartition(dp.partitionID)
275
		return
276
	}
277

278
	go dp.StartRaftLoggingSchedule()
279
	disk.AddSize(uint64(dp.Size()))
280
	dp.ForceLoadHeader()
281
	return
282
}
283

284
func newDataPartition(dpCfg *dataPartitionCfg, disk *Disk, isCreate bool) (dp *DataPartition, err error) {
285
	partitionID := dpCfg.PartitionID
286
	var dataPath string
287

288
	if proto.IsNormalDp(dpCfg.PartitionType) {
289
		dataPath = path.Join(disk.Path, fmt.Sprintf(DataPartitionPrefix+"_%v_%v", partitionID, dpCfg.PartitionSize))
290
	} else if proto.IsCacheDp(dpCfg.PartitionType) {
291
		dataPath = path.Join(disk.Path, fmt.Sprintf(CachePartitionPrefix+"_%v_%v", partitionID, dpCfg.PartitionSize))
292
	} else if proto.IsPreLoadDp(dpCfg.PartitionType) {
293
		dataPath = path.Join(disk.Path, fmt.Sprintf(PreLoadPartitionPrefix+"_%v_%v", partitionID, dpCfg.PartitionSize))
294
	} else {
295
		return nil, fmt.Errorf("newDataPartition fail, dataPartitionCfg(%v)", dpCfg)
296
	}
297

298
	partition := &DataPartition{
299
		volumeID:                dpCfg.VolName,
300
		clusterID:               dpCfg.ClusterID,
301
		partitionID:             partitionID,
302
		replicaNum:              dpCfg.ReplicaNum,
303
		disk:                    disk,
304
		dataNode:                disk.dataNode,
305
		path:                    dataPath,
306
		partitionSize:           dpCfg.PartitionSize,
307
		partitionType:           dpCfg.PartitionType,
308
		replicas:                make([]string, 0),
309
		stopC:                   make(chan bool),
310
		stopRaftC:               make(chan uint64),
311
		storeC:                  make(chan uint64, 128),
312
		snapshot:                make([]*proto.File, 0),
313
		partitionStatus:         proto.ReadWrite,
314
		config:                  dpCfg,
315
		raftStatus:              RaftStatusStopped,
316
		verSeq:                  dpCfg.VerSeq,
317
		DataPartitionCreateType: dpCfg.CreateType,
318
		volVersionInfoList:      &proto.VolVersionInfoList{},
319
	}
320
	atomic.StoreUint64(&partition.recoverErrCnt, 0)
321
	log.LogInfof("action[newDataPartition] dp %v replica num %v", partitionID, dpCfg.ReplicaNum)
322
	partition.replicasInit()
323
	partition.extentStore, err = storage.NewExtentStore(partition.path, dpCfg.PartitionID, dpCfg.PartitionSize,
324
		partition.partitionType, isCreate)
325
	if err != nil {
326
		log.LogWarnf("action[newDataPartition] dp %v NewExtentStore failed %v", partitionID, err.Error())
327
		return
328
	}
329
	// store applyid
330
	if err = partition.storeAppliedID(partition.appliedID); err != nil {
331
		log.LogErrorf("action[newDataPartition] dp %v initial Apply [%v] failed: %v",
332
			partition.partitionID, partition.appliedID, err)
333
		return
334
	}
335
	disk.AttachDataPartition(partition)
336
	dp = partition
337
	go partition.statusUpdateScheduler()
338
	go partition.startEvict()
339
	if isCreate {
340
		if err = dp.getVerListFromMaster(); err != nil {
341
			log.LogErrorf("action[newDataPartition] vol %v dp %v loadFromMaster verList failed err %v", dp.volumeID, dp.partitionID, err)
342
			return
343
		}
344
	}
345

346
	log.LogInfof("action[newDataPartition] dp %v replica num %v CreateType %v create success",
347
		dp.partitionID, dpCfg.ReplicaNum, dp.DataPartitionCreateType)
348
	return
349
}
350

351
func (partition *DataPartition) HandleVersionOp(req *proto.MultiVersionOpRequest) (err error) {
352
	var (
353
		verData []byte
354
		pItem   *RaftCmdItem
355
	)
356
	if verData, err = json.Marshal(req); err != nil {
357
		return
358
	}
359
	pItem = &RaftCmdItem{
360
		Op: uint32(proto.OpVersionOp),
361
		K:  []byte("version"),
362
		V:  verData,
363
	}
364
	data, _ := MarshalRaftCmd(pItem)
365
	_, err = partition.Submit(data)
366
	return
367
}
368

369
func (partition *DataPartition) fsmVersionOp(opItem *RaftCmdItem) (err error) {
370
	req := new(proto.MultiVersionOpRequest)
371
	if err = json.Unmarshal(opItem.V, req); err != nil {
372
		log.LogErrorf("action[fsmVersionOp] dp[%v] op item %v", partition.partitionID, opItem)
373
		return
374
	}
375
	if len(req.VolVerList) == 0 {
376
		return
377
	}
378
	lastSeq := req.VolVerList[len(req.VolVerList)-1].Ver
379
	partition.volVersionInfoList.RWLock.Lock()
380
	if len(partition.volVersionInfoList.VerList) == 0 {
381
		partition.volVersionInfoList.VerList = make([]*proto.VolVersionInfo, len(req.VolVerList))
382
		copy(partition.volVersionInfoList.VerList, req.VolVerList)
383
		partition.verSeq = lastSeq
384
		log.LogInfof("action[fsmVersionOp] dp %v seq %v updateVerList reqeust ver %v verlist  %v  dp verlist nil and set",
385
			partition.partitionID, partition.verSeq, lastSeq, req.VolVerList)
386
		partition.volVersionInfoList.RWLock.Unlock()
387
		return
388
	}
389

390
	lastVerInfo := partition.volVersionInfoList.GetLastVolVerInfo()
391
	log.LogInfof("action[fsmVersionOp] dp %v seq %v lastVerList seq %v req seq %v op %v",
392
		partition.partitionID, partition.verSeq, lastVerInfo.Ver, lastSeq, req.Op)
393

394
	if lastVerInfo.Ver >= lastSeq {
395
		if lastVerInfo.Ver == lastSeq {
396
			if req.Op == proto.CreateVersionCommit {
397
				lastVerInfo.Status = proto.VersionNormal
398
			}
399
		}
400
		partition.volVersionInfoList.RWLock.Unlock()
401
		return
402
	}
403

404
	var status uint8 = proto.VersionPrepare
405
	if req.Op == proto.CreateVersionCommit {
406
		status = proto.VersionNormal
407
	}
408
	partition.volVersionInfoList.VerList = append(partition.volVersionInfoList.VerList, &proto.VolVersionInfo{
409
		Status: status,
410
		Ver:    lastSeq,
411
	})
412

413
	partition.verSeq = lastSeq
414

415
	err = partition.PersistMetadata()
416
	log.LogInfof("action[fsmVersionOp] dp %v seq %v updateVerList reqeust add new seq %v verlist (%v) err (%v)",
417
		partition.partitionID, partition.verSeq, lastSeq, partition.volVersionInfoList, err)
418

419
	partition.volVersionInfoList.RWLock.Unlock()
420
	return
421
}
422

423
func (dp *DataPartition) getVerListFromMaster() (err error) {
424
	var verList *proto.VolVersionInfoList
425
	verList, err = MasterClient.AdminAPI().GetVerList(dp.volumeID)
426
	if err != nil {
427
		log.LogErrorf("action[onStart] GetVerList err[%v]", err)
428
		return
429
	}
430

431
	for _, info := range verList.VerList {
432
		if info.Status != proto.VersionNormal {
433
			continue
434
		}
435
		dp.volVersionInfoList.VerList = append(dp.volVersionInfoList.VerList, info)
436
	}
437

438
	log.LogDebugf("action[onStart] dp %v verList %v", dp.partitionID, dp.volVersionInfoList.VerList)
439
	dp.verSeq = dp.volVersionInfoList.GetLastVer()
440
	return
441
}
442

443
func (dp *DataPartition) replicasInit() {
444
	replicas := make([]string, 0)
445
	if dp.config.Hosts == nil {
446
		return
447
	}
448
	replicas = append(replicas, dp.config.Hosts...)
449
	dp.replicasLock.Lock()
450
	dp.replicas = replicas
451
	dp.replicasLock.Unlock()
452
	if dp.config.Hosts != nil && len(dp.config.Hosts) >= 1 {
453
		leaderAddr := strings.Split(dp.config.Hosts[0], ":")
454
		if len(leaderAddr) == 2 && strings.TrimSpace(leaderAddr[0]) == LocalIP {
455
			dp.isLeader = true
456
		}
457
	}
458
}
459

460
func (dp *DataPartition) GetExtentCount() int {
461
	return dp.extentStore.GetExtentCount()
462
}
463

464
func (dp *DataPartition) Path() string {
465
	return dp.path
466
}
467

468
// IsRaftLeader tells if the given address belongs to the raft leader.
469
func (dp *DataPartition) IsRaftLeader() (addr string, ok bool) {
470
	if dp.raftStopped() {
471
		return
472
	}
473
	leaderID, _ := dp.raftPartition.LeaderTerm()
474
	if leaderID == 0 {
475
		return
476
	}
477
	ok = leaderID == dp.config.NodeID
478
	for _, peer := range dp.config.Peers {
479
		if leaderID == peer.ID {
480
			addr = peer.Addr
481
			return
482
		}
483
	}
484
	return
485
}
486

487
func (dp *DataPartition) Replicas() []string {
488
	dp.replicasLock.RLock()
489
	defer dp.replicasLock.RUnlock()
490
	return dp.replicas
491
}
492

493
func (dp *DataPartition) getReplicaCopy() []string {
494
	dp.replicasLock.RLock()
495
	defer dp.replicasLock.RUnlock()
496

497
	tmpCopy := make([]string, len(dp.replicas))
498
	copy(tmpCopy, dp.replicas)
499

500
	return tmpCopy
501
}
502

503
func (dp *DataPartition) getReplicaAddr(index int) string {
504
	dp.replicasLock.RLock()
505
	defer dp.replicasLock.RUnlock()
506
	return dp.replicas[index]
507
}
508

509
func (dp *DataPartition) getReplicaLen() int {
510
	dp.replicasLock.RLock()
511
	defer dp.replicasLock.RUnlock()
512
	return len(dp.replicas)
513
}
514

515
func (dp *DataPartition) IsExistReplica(addr string) bool {
516
	dp.replicasLock.RLock()
517
	defer dp.replicasLock.RUnlock()
518
	for _, host := range dp.replicas {
519
		if host == addr {
520
			return true
521
		}
522
	}
523
	return false
524
}
525

526
func (dp *DataPartition) ReloadSnapshot() {
527
	files, err := dp.extentStore.SnapShot()
528
	if err != nil {
529
		log.LogErrorf("ReloadSnapshot err %v", err)
530
		return
531
	}
532

533
	dp.snapshotMutex.Lock()
534
	for _, f := range dp.snapshot {
535
		storage.PutSnapShotFileToPool(f)
536
	}
537
	dp.snapshot = files
538
	dp.snapshotMutex.Unlock()
539
}
540

541
// Snapshot returns the snapshot of the data partition.
542
func (dp *DataPartition) SnapShot() (files []*proto.File) {
543
	dp.snapshotMutex.RLock()
544
	defer dp.snapshotMutex.RUnlock()
545

546
	return dp.snapshot
547
}
548

549
// Stop close the store and the raft store.
550
func (dp *DataPartition) Stop() {
551
	dp.stopOnce.Do(func() {
552
		if dp.stopC != nil {
553
			close(dp.stopC)
554
		}
555
		// Close the store and raftstore.
556
		dp.stopRaft()
557
		dp.extentStore.Close()
558
		err := dp.storeAppliedID(atomic.LoadUint64(&dp.appliedID))
559
		if err != nil {
560
			log.LogErrorf("action[Stop]: failed to store applied index")
561
		}
562
	})
563
}
564

565
// Disk returns the disk instance.
566
func (dp *DataPartition) Disk() *Disk {
567
	return dp.disk
568
}
569

570
// func (dp *DataPartition) IsRejectWrite() bool {
571
// 	return dp.Disk().RejectWrite
572
// }
573

574
// Status returns the partition status.
575
func (dp *DataPartition) Status() int {
576
	return dp.partitionStatus
577
}
578

579
// Size returns the partition size.
580
func (dp *DataPartition) Size() int {
581
	return dp.partitionSize
582
}
583

584
// Used returns the used space.
585
func (dp *DataPartition) Used() int {
586
	return dp.used
587
}
588

589
// Available returns the available space.
590
func (dp *DataPartition) Available() int {
591
	return dp.partitionSize - dp.used
592
}
593

594
func (dp *DataPartition) ForceLoadHeader() {
595
	dp.loadExtentHeaderStatus = FinishLoadDataPartitionExtentHeader
596
}
597

598
// PersistMetadata persists the file metadata on the disk.
599
func (dp *DataPartition) PersistMetadata() (err error) {
600
	dp.persistMetaMutex.Lock()
601
	defer dp.persistMetaMutex.Unlock()
602

603
	var (
604
		metadataFile *os.File
605
		metaData     []byte
606
	)
607
	fileName := path.Join(dp.Path(), TempMetadataFileName)
608
	if metadataFile, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0o666); err != nil {
609
		return
610
	}
611
	defer func() {
612
		metadataFile.Sync()
613
		metadataFile.Close()
614
		os.Remove(fileName)
615
	}()
616

617
	md := &DataPartitionMetadata{
618
		VolumeID:                dp.config.VolName,
619
		PartitionID:             dp.config.PartitionID,
620
		ReplicaNum:              dp.config.ReplicaNum,
621
		PartitionSize:           dp.config.PartitionSize,
622
		PartitionType:           dp.config.PartitionType,
623
		Peers:                   dp.config.Peers,
624
		Hosts:                   dp.config.Hosts,
625
		DataPartitionCreateType: dp.DataPartitionCreateType,
626
		CreateTime:              time.Now().Format(TimeLayout),
627
		LastTruncateID:          dp.lastTruncateID,
628
		StopRecover:             dp.stopRecover,
629
		VerList:                 dp.volVersionInfoList.VerList,
630
		ApplyID:                 dp.appliedID,
631
	}
632

633
	if metaData, err = json.Marshal(md); err != nil {
634
		return
635
	}
636
	if _, err = metadataFile.Write(metaData); err != nil {
637
		return
638
	}
639
	dp.metaAppliedID = dp.appliedID
640
	log.LogInfof("PersistMetadata DataPartition(%v) data(%v)", dp.partitionID, string(metaData))
641
	err = os.Rename(fileName, path.Join(dp.Path(), DataPartitionMetadataFileName))
642
	return
643
}
644

645
func (dp *DataPartition) statusUpdateScheduler() {
646
	ticker := time.NewTicker(time.Minute)
647
	snapshotTicker := time.NewTicker(time.Minute * 5)
648
	var index int
649
	for {
650
		select {
651
		case <-ticker.C:
652
			dp.statusUpdate()
653
			// only repair tiny extent
654
			if !dp.isNormalType() {
655
				dp.LaunchRepair(proto.TinyExtentType)
656
				continue
657
			}
658

659
			index++
660
			if index >= math.MaxUint32 {
661
				index = 0
662
			}
663

664
			if index%2 == 0 {
665
				dp.LaunchRepair(proto.TinyExtentType)
666
			} else {
667
				dp.LaunchRepair(proto.NormalExtentType)
668
			}
669
		case <-snapshotTicker.C:
670
			dp.ReloadSnapshot()
671
		case <-dp.stopC:
672
			ticker.Stop()
673
			snapshotTicker.Stop()
674
			return
675
		}
676
	}
677
}
678

679
func (dp *DataPartition) statusUpdate() {
680
	status := proto.ReadWrite
681
	dp.computeUsage()
682

683
	if dp.used >= dp.partitionSize {
684
		status = proto.ReadOnly
685
	}
686
	if dp.isNormalType() && dp.extentStore.GetExtentCount() >= storage.MaxExtentCount {
687
		status = proto.ReadOnly
688
	}
689
	if dp.isNormalType() && dp.raftStatus == RaftStatusStopped {
690
		// dp is still recovering
691
		if dp.DataPartitionCreateType == proto.DecommissionedCreateDataPartition {
692
			status = proto.Recovering
693
		} else {
694
			status = proto.Unavailable
695
		}
696
	}
697
	if dp.getDiskErrCnt() > 0 {
698
		dp.partitionStatus = proto.Unavailable
699
	}
700

701
	log.LogInfof("action[statusUpdate] dp %v raft status %v dp.status %v, status %v, disk status %v",
702
		dp.partitionID, dp.raftStatus, dp.Status(), status, float64(dp.disk.Status))
703
	// dp.partitionStatus = int(math.Min(float64(status), float64(dp.disk.Status)))
704
	dp.partitionStatus = status
705
}
706

707
func (dp *DataPartition) computeUsage() {
708
	if time.Now().Unix()-dp.intervalToUpdatePartitionSize < IntervalToUpdatePartitionSize {
709
		return
710
	}
711
	dp.used = int(dp.ExtentStore().GetStoreUsedSize())
712
	dp.intervalToUpdatePartitionSize = time.Now().Unix()
713
}
714

715
func (dp *DataPartition) ExtentStore() *storage.ExtentStore {
716
	return dp.extentStore
717
}
718

719
func (dp *DataPartition) checkIsDiskError(err error, rwFlag uint8) {
720
	if err == nil {
721
		return
722
	}
723
	log.LogWarnf("checkIsDiskError: disk path %v, error: %v, partition:%v, rwFlag:%v",
724
		dp.Path(), err.Error(), dp.partitionID, rwFlag)
725
	if !IsDiskErr(err.Error()) {
726
		return
727
	}
728

729
	dp.stopRaft()
730
	dp.incDiskErrCnt()
731
	dp.disk.triggerDiskError(rwFlag, dp.partitionID)
732

733
	// must after change disk.status
734
	dp.statusUpdate()
735
	return
736
}
737

738
func newRaftApplyError(err error) error {
739
	return errors.NewErrorf("[Custom Error]: unhandled raft apply error, err(%s)", err)
740
}
741

742
func isRaftApplyError(errMsg string) bool {
743
	return strings.Contains(errMsg, "[Custom Error]: unhandled raft apply error")
744
}
745

746
// String returns the string format of the data partition information.
747
func (dp *DataPartition) String() (m string) {
748
	return fmt.Sprintf(DataPartitionPrefix+"_%v_%v", dp.partitionID, dp.partitionSize)
749
}
750

751
// LaunchRepair launches the repair of extents.
752
func (dp *DataPartition) LaunchRepair(extentType uint8) {
753
	if dp.partitionStatus == proto.Unavailable {
754
		return
755
	}
756
	if err := dp.updateReplicas(false); err != nil {
757
		log.LogErrorf("action[LaunchRepair] partition(%v) err(%v).", dp.partitionID, err)
758
		return
759
	}
760
	if !dp.isLeader {
761
		return
762
	}
763
	if dp.extentStore.BrokenTinyExtentCnt() == 0 {
764
		dp.extentStore.MoveAllToBrokenTinyExtentC(MinTinyExtentsToRepair)
765
	}
766
	dp.repair(extentType)
767
}
768

769
func (dp *DataPartition) updateReplicas(isForce bool) (err error) {
770
	if !isForce && time.Now().Unix()-dp.intervalToUpdateReplicas <= IntervalToUpdateReplica {
771
		return
772
	}
773
	dp.isLeader = false
774
	isLeader, replicas, err := dp.fetchReplicasFromMaster()
775
	if err != nil {
776
		return
777
	}
778
	dp.replicasLock.Lock()
779
	defer dp.replicasLock.Unlock()
780
	if !dp.compareReplicas(dp.replicas, replicas) {
781
		log.LogInfof("action[updateReplicas] partition(%v) replicas changed from (%v) to (%v).",
782
			dp.partitionID, dp.replicas, replicas)
783
	}
784
	dp.isLeader = isLeader
785
	dp.replicas = replicas
786
	dp.intervalToUpdateReplicas = time.Now().Unix()
787
	log.LogInfof(fmt.Sprintf("ActionUpdateReplicationHosts partiton(%v), force(%v)", dp.partitionID, isForce))
788

789
	return
790
}
791

792
// Compare the fetched replica with the local one.
793
func (dp *DataPartition) compareReplicas(v1, v2 []string) (equals bool) {
794
	if len(v1) == len(v2) {
795
		for i := 0; i < len(v1); i++ {
796
			if v1[i] != v2[i] {
797
				return false
798
			}
799
		}
800
		return true
801
	}
802
	return false
803
}
804

805
// Fetch the replica information from the master.
806
func (dp *DataPartition) fetchReplicasFromMaster() (isLeader bool, replicas []string, err error) {
807
	var partition *proto.DataPartitionInfo
808
	retry := 0
809
	for {
810
		if partition, err = MasterClient.AdminAPI().GetDataPartition(dp.volumeID, dp.partitionID); err != nil {
811
			retry++
812
			if retry > 5 {
813
				isLeader = false
814
				return
815
			}
816
		} else {
817
			break
818
		}
819
		time.Sleep(10 * time.Second)
820
	}
821

822
	replicas = append(replicas, partition.Hosts...)
823
	if partition.Hosts != nil && len(partition.Hosts) >= 1 {
824
		leaderAddr := strings.Split(partition.Hosts[0], ":")
825
		if len(leaderAddr) == 2 && strings.TrimSpace(leaderAddr[0]) == LocalIP {
826
			isLeader = true
827
		}
828
	}
829
	return
830
}
831

832
func (dp *DataPartition) Load() (response *proto.LoadDataPartitionResponse) {
833
	response = &proto.LoadDataPartitionResponse{}
834
	response.PartitionId = uint64(dp.partitionID)
835
	response.PartitionStatus = dp.partitionStatus
836
	response.Used = uint64(dp.Used())
837
	var err error
838

839
	if dp.loadExtentHeaderStatus != FinishLoadDataPartitionExtentHeader {
840
		response.PartitionSnapshot = make([]*proto.File, 0)
841
	} else {
842
		response.PartitionSnapshot = dp.SnapShot()
843
	}
844
	if err != nil {
845
		response.Status = proto.TaskFailed
846
		response.Result = err.Error()
847
		return
848
	}
849
	return
850
}
851

852
// DoExtentStoreRepair performs the repairs of the extent store.
853
// 1. when the extent size is smaller than the max size on the record, start to repair the missing part.
854
// 2. if the extent does not even exist, create the extent first, and then repair.
855
func (dp *DataPartition) DoExtentStoreRepair(repairTask *DataPartitionRepairTask) {
856
	if dp.stopRecover && dp.isDecommissionRecovering() {
857
		log.LogWarnf("DoExtentStoreRepair %v receive stop signal", dp.partitionID)
858
		return
859
	}
860
	store := dp.extentStore
861
	log.LogDebugf("DoExtentStoreRepair.dp %v len extents %v", dp.partitionID, len(repairTask.ExtentsToBeCreated))
862
	for _, extentInfo := range repairTask.ExtentsToBeCreated {
863
		log.LogDebugf("DoExtentStoreRepair.dp %v len extentInfo %v", dp.partitionID, extentInfo)
864
		if storage.IsTinyExtent(extentInfo.FileID) {
865
			continue
866
		}
867
		if store.HasExtent(uint64(extentInfo.FileID)) {
868
			continue
869
		}
870
		if !AutoRepairStatus {
871
			log.LogWarnf("AutoRepairStatus is False,so cannot Create extent(%v)", extentInfo.String())
872
			continue
873
		}
874

875
		dp.disk.allocCheckLimit(proto.IopsWriteType, 1)
876

877
		err := store.Create(uint64(extentInfo.FileID))
878
		if err != nil {
879
			continue
880
		}
881
	}
882

883
	var (
884
		wg           *sync.WaitGroup
885
		recoverIndex int
886
	)
887
	wg = new(sync.WaitGroup)
888
	for _, extentInfo := range repairTask.ExtentsToBeRepaired {
889
		if dp.stopRecover && dp.isDecommissionRecovering() {
890
			log.LogWarnf("DoExtentStoreRepair %v receive stop signal", dp.partitionID)
891
			return
892
		}
893
		if !store.HasExtent(uint64(extentInfo.FileID)) {
894
			continue
895
		}
896
		wg.Add(1)
897

898
		// repair the extents
899
		go dp.doStreamExtentFixRepair(wg, extentInfo)
900
		recoverIndex++
901

902
		if recoverIndex%NumOfFilesToRecoverInParallel == 0 {
903
			wg.Wait()
904
		}
905
	}
906
	wg.Wait()
907
	dp.doStreamFixTinyDeleteRecord(repairTask)
908
}
909

910
func (dp *DataPartition) pushSyncDeleteRecordFromLeaderMesg() bool {
911
	select {
912
	case dp.Disk().syncTinyDeleteRecordFromLeaderOnEveryDisk <- true:
913
		return true
914
	default:
915
		return false
916
	}
917
}
918

919
func (dp *DataPartition) consumeTinyDeleteRecordFromLeaderMesg() {
920
	select {
921
	case <-dp.Disk().syncTinyDeleteRecordFromLeaderOnEveryDisk:
922
		return
923
	default:
924
		return
925
	}
926
}
927

928
func (dp *DataPartition) doStreamFixTinyDeleteRecord(repairTask *DataPartitionRepairTask) {
929
	var (
930
		localTinyDeleteFileSize int64
931
		err                     error
932
		conn                    net.Conn
933
	)
934
	if !dp.pushSyncDeleteRecordFromLeaderMesg() {
935
		return
936
	}
937

938
	defer func() {
939
		dp.consumeTinyDeleteRecordFromLeaderMesg()
940
	}()
941
	if localTinyDeleteFileSize, err = dp.extentStore.LoadTinyDeleteFileOffset(); err != nil {
942
		return
943
	}
944

945
	log.LogInfof(ActionSyncTinyDeleteRecord+" start PartitionID(%v) localTinyDeleteFileSize(%v) leaderTinyDeleteFileSize(%v) leaderAddr(%v)",
946
		dp.partitionID, localTinyDeleteFileSize, repairTask.LeaderTinyDeleteRecordFileSize, repairTask.LeaderAddr)
947

948
	if localTinyDeleteFileSize >= repairTask.LeaderTinyDeleteRecordFileSize {
949
		return
950
	}
951

952
	if repairTask.LeaderTinyDeleteRecordFileSize-localTinyDeleteFileSize < MinTinyExtentDeleteRecordSyncSize {
953
		return
954
	}
955

956
	defer func() {
957
		log.LogInfof(ActionSyncTinyDeleteRecord+" end PartitionID(%v) localTinyDeleteFileSize(%v) leaderTinyDeleteFileSize(%v) leaderAddr(%v) err(%v)",
958
			dp.partitionID, localTinyDeleteFileSize, repairTask.LeaderTinyDeleteRecordFileSize, repairTask.LeaderAddr, err)
959
	}()
960

961
	p := repl.NewPacketToReadTinyDeleteRecord(dp.partitionID, localTinyDeleteFileSize)
962
	if conn, err = dp.getRepairConn(repairTask.LeaderAddr); err != nil {
963
		return
964
	}
965
	defer func() {
966
		dp.putRepairConn(conn, err != nil)
967
	}()
968
	if err = p.WriteToConn(conn); err != nil {
969
		return
970
	}
971
	store := dp.extentStore
972
	start := time.Now().Unix()
973
	for localTinyDeleteFileSize < repairTask.LeaderTinyDeleteRecordFileSize {
974
		if dp.stopRecover && dp.isDecommissionRecovering() {
975
			log.LogWarnf("doStreamFixTinyDeleteRecord %v receive stop signal", dp.partitionID)
976
			return
977
		}
978
		if localTinyDeleteFileSize >= repairTask.LeaderTinyDeleteRecordFileSize {
979
			return
980
		}
981
		if err = p.ReadFromConnWithVer(conn, proto.ReadDeadlineTime); err != nil {
982
			return
983
		}
984
		if p.IsErrPacket() {
985
			logContent := fmt.Sprintf("action[doStreamFixTinyDeleteRecord] %v.",
986
				p.LogMessage(p.GetOpMsg(), conn.RemoteAddr().String(), start, fmt.Errorf(string(p.Data[:p.Size]))))
987
			err = fmt.Errorf(logContent)
988
			return
989
		}
990
		if p.CRC != crc32.ChecksumIEEE(p.Data[:p.Size]) {
991
			err = fmt.Errorf("crc not match")
992
			return
993
		}
994
		if p.Size%storage.DeleteTinyRecordSize != 0 {
995
			err = fmt.Errorf("unavali size")
996
			return
997
		}
998
		var index int
999
		for (index+1)*storage.DeleteTinyRecordSize <= int(p.Size) {
1000
			record := p.Data[index*storage.DeleteTinyRecordSize : (index+1)*storage.DeleteTinyRecordSize]
1001
			extentID, offset, size := storage.UnMarshalTinyExtent(record)
1002
			localTinyDeleteFileSize += storage.DeleteTinyRecordSize
1003
			index++
1004
			if !storage.IsTinyExtent(extentID) {
1005
				continue
1006
			}
1007
			DeleteLimiterWait()
1008
			dp.disk.allocCheckLimit(proto.IopsWriteType, 1)
1009
			// log.LogInfof("doStreamFixTinyDeleteRecord Delete PartitionID(%v)_Extent(%v)_Offset(%v)_Size(%v)", dp.partitionID, extentID, offset, size)
1010
			store.MarkDelete(extentID, int64(offset), int64(size))
1011
		}
1012
	}
1013
}
1014

1015
// ChangeRaftMember is a wrapper function of changing the raft member.
1016
func (dp *DataPartition) ChangeRaftMember(changeType raftProto.ConfChangeType, peer raftProto.Peer, context []byte) (resp interface{}, err error) {
1017
	resp, err = dp.raftPartition.ChangeMember(changeType, peer, context)
1018
	return
1019
}
1020

1021
func (dp *DataPartition) canRemoveSelf() (canRemove bool, err error) {
1022
	var partition *proto.DataPartitionInfo
1023
	retry := 0
1024
	for {
1025
		if partition, err = MasterClient.AdminAPI().GetDataPartition(dp.volumeID, dp.partitionID); err != nil {
1026
			log.LogErrorf("action[canRemoveSelf] err[%v]", err)
1027
			retry++
1028
			if retry > 60 {
1029
				return
1030
			}
1031
		} else {
1032
			break
1033
		}
1034
		time.Sleep(10 * time.Second)
1035
	}
1036

1037
	canRemove = false
1038
	var existInPeers bool
1039
	for _, peer := range partition.Peers {
1040
		if dp.config.NodeID == peer.ID {
1041
			existInPeers = true
1042
		}
1043
	}
1044
	if !existInPeers {
1045
		canRemove = true
1046
		return
1047
	}
1048
	if dp.config.NodeID == partition.OfflinePeerID {
1049
		canRemove = true
1050
		return
1051
	}
1052
	return
1053
}
1054

1055
func (dp *DataPartition) getRepairConn(target string) (net.Conn, error) {
1056
	return dp.dataNode.getRepairConnFunc(target)
1057
}
1058

1059
func (dp *DataPartition) putRepairConn(conn net.Conn, forceClose bool) {
1060
	log.LogDebugf("action[putRepairConn], forceClose: %v", forceClose)
1061
	dp.dataNode.putRepairConnFunc(conn, forceClose)
1062
}
1063

1064
func (dp *DataPartition) isNormalType() bool {
1065
	return proto.IsNormalDp(dp.partitionType)
1066
}
1067

1068
type SimpleVolView struct {
1069
	vv             *proto.SimpleVolView
1070
	lastUpdateTime time.Time
1071
}
1072

1073
type VolMap struct {
1074
	sync.Mutex
1075
	volMap map[string]*SimpleVolView
1076
}
1077

1078
var volViews = VolMap{
1079
	Mutex:  sync.Mutex{},
1080
	volMap: make(map[string]*SimpleVolView),
1081
}
1082

1083
func (vo *VolMap) getSimpleVolView(VolumeID string) (vv *proto.SimpleVolView, err error) {
1084
	vo.Lock()
1085
	if volView, ok := vo.volMap[VolumeID]; ok && time.Since(volView.lastUpdateTime) < 5*time.Minute {
1086
		vo.Unlock()
1087
		return volView.vv, nil
1088
	}
1089
	vo.Unlock()
1090

1091
	volView := &SimpleVolView{
1092
		vv:             nil,
1093
		lastUpdateTime: time.Time{},
1094
	}
1095

1096
	if vv, err = MasterClient.AdminAPI().GetVolumeSimpleInfo(VolumeID); err != nil {
1097
		log.LogErrorf("action[GetVolumeSimpleInfo] cannot get vol(%v) from master(%v) err(%v).",
1098
			VolumeID, MasterClient.Leader(), err)
1099
		return nil, err
1100
	}
1101

1102
	log.LogDebugf("get volume info, vol(%s), vol(%v)", vv.Name, volView)
1103

1104
	volView.vv = vv
1105
	volView.lastUpdateTime = time.Now()
1106

1107
	vo.Lock()
1108
	vo.volMap[VolumeID] = volView
1109
	vo.Unlock()
1110

1111
	return
1112
}
1113

1114
func (dp *DataPartition) doExtentTtl(ttl int) {
1115
	if ttl <= 0 {
1116
		log.LogWarn("[doTTL] ttl is 0, set default 30", ttl)
1117
		ttl = 30
1118
	}
1119

1120
	extents := dp.extentStore.DumpExtents()
1121

1122
	for _, ext := range extents {
1123
		if storage.IsTinyExtent(ext.FileID) {
1124
			continue
1125
		}
1126

1127
		if time.Now().Unix()-ext.AccessTime > int64(ttl)*util.OneDaySec() {
1128
			log.LogDebugf("action[doExtentTtl] ttl delete dp(%v) extent(%v).", dp.partitionID, ext)
1129
			dp.extentStore.MarkDelete(ext.FileID, 0, 0)
1130
		}
1131
	}
1132
}
1133

1134
func (dp *DataPartition) doExtentEvict(vv *proto.SimpleVolView) {
1135
	var (
1136
		needDieOut      bool
1137
		freeSpace       int
1138
		freeExtentCount int
1139
	)
1140

1141
	needDieOut = false
1142
	if vv.CacheHighWater < vv.CacheLowWater || vv.CacheLowWater < 0 || vv.CacheHighWater > 100 {
1143
		log.LogErrorf("action[doExtentEvict] invalid policy dp(%v), CacheHighWater(%v) CacheLowWater(%v).",
1144
			dp.partitionID, vv.CacheHighWater, vv.CacheLowWater)
1145
		return
1146
	}
1147

1148
	// if dp use age larger than the space high water, do die out.
1149
	freeSpace = 0
1150
	if dp.Used()*100/dp.Size() > vv.CacheHighWater {
1151
		needDieOut = true
1152
		freeSpace = dp.Used() - dp.Size()*vv.CacheLowWater/100
1153
	} else if dp.partitionStatus == proto.ReadOnly {
1154
		needDieOut = true
1155
		freeSpace = dp.Used() * (vv.CacheHighWater - vv.CacheLowWater) / 100
1156
	}
1157

1158
	// if dp extent count larger than upper count, do die out.
1159
	freeExtentCount = 0
1160
	extInfos := dp.extentStore.DumpExtents()
1161
	maxExtentCount := dp.Size() / util.DefaultTinySizeLimit
1162
	if len(extInfos) > maxExtentCount {
1163
		needDieOut = true
1164
		freeExtentCount = len(extInfos) - vv.CacheLowWater*maxExtentCount/100
1165
	}
1166

1167
	log.LogDebugf("action[doExtentEvict], vol %v, LRU(%v, %v), dp %v, usage %v, status(%d), extents %v, freeSpace %v, freeExtentCount %v, needDieOut %v",
1168
		vv.Name, vv.CacheLowWater, vv.CacheHighWater, dp.partitionID, dp.Used()*100/dp.Size(), dp.partitionStatus, len(extInfos),
1169
		freeSpace, freeExtentCount, needDieOut)
1170

1171
	if !needDieOut {
1172
		return
1173
	}
1174

1175
	sort.Sort(extInfos)
1176

1177
	for _, ext := range extInfos {
1178
		if storage.IsTinyExtent(ext.FileID) {
1179
			continue
1180
		}
1181

1182
		freeSpace -= int(ext.Size)
1183
		freeExtentCount--
1184
		dp.extentStore.MarkDelete(ext.FileID, 0, 0)
1185
		log.LogDebugf("action[doExtentEvict] die out. vol %v, dp(%v), extent(%v).", vv.Name, dp.partitionID, *ext)
1186

1187
		if freeSpace <= 0 && freeExtentCount <= 0 {
1188
			log.LogDebugf("[doExtentEvict] die out done, vol(%s), dp (%d)", vv.Name, dp.partitionID)
1189
			break
1190
		}
1191
	}
1192
}
1193

1194
func (dp *DataPartition) startEvict() {
1195
	// only cache or preload dp can't do evict.
1196
	if !proto.IsCacheDp(dp.partitionType) {
1197
		return
1198
	}
1199

1200
	log.LogDebugf("[startEvict] start do dp(%d) evict op", dp.partitionID)
1201

1202
	vv, err := volViews.getSimpleVolView(dp.volumeID)
1203
	if err != nil {
1204
		err := fmt.Errorf("[startEvict] get vol [%s] info error, err %s", dp.volumeID, err.Error())
1205
		log.LogError(err)
1206
		panic(err)
1207
	}
1208

1209
	lruInterval := getWithDefault(vv.CacheLruInterval, 5)
1210
	cacheTtl := getWithDefault(vv.CacheTtl, 30)
1211

1212
	lruTimer := time.NewTicker(time.Duration(lruInterval) * time.Minute)
1213
	ttlTimer := time.NewTicker(time.Duration(util.OneDaySec()) * time.Second)
1214
	defer func() {
1215
		lruTimer.Stop()
1216
		ttlTimer.Stop()
1217
	}()
1218

1219
	for {
1220
		// check volume type and dp type.
1221
		if proto.IsHot(vv.VolType) || !proto.IsCacheDp(dp.partitionType) {
1222
			log.LogErrorf("action[startEvict] cannot startEvict, vol(%v), dp(%v).", vv.Name, dp.partitionID)
1223
			return
1224
		}
1225

1226
		select {
1227
		case <-lruTimer.C:
1228
			log.LogDebugf("start [doExtentEvict] vol(%s), dp(%d).", vv.Name, dp.partitionID)
1229
			evictStart := time.Now()
1230
			dp.doExtentEvict(vv)
1231
			log.LogDebugf("action[doExtentEvict] vol(%v), dp(%v), cost (%v)ms, .", vv.Name, dp.partitionID, time.Since(evictStart))
1232

1233
		case <-ttlTimer.C:
1234
			log.LogDebugf("start [doExtentTtl] vol(%s), dp(%d).", vv.Name, dp.partitionID)
1235
			ttlStart := time.Now()
1236
			dp.doExtentTtl(cacheTtl)
1237
			log.LogDebugf("action[doExtentTtl] vol(%v), dp(%v), cost (%v)ms.", vv.Name, dp.partitionID, time.Since(ttlStart))
1238

1239
		case <-dp.stopC:
1240
			log.LogWarn("task[doExtentTtl] stopped", dp.volumeID, dp.partitionID)
1241
			return
1242
		}
1243

1244
		// loop update vol info
1245
		newVV, err := volViews.getSimpleVolView(dp.volumeID)
1246
		if err != nil {
1247
			err := fmt.Errorf("[startEvict] get vol [%s] info error, err %s", dp.volumeID, err.Error())
1248
			log.LogError(err)
1249
			continue
1250
		}
1251

1252
		vv = newVV
1253
		if lruInterval != vv.CacheLruInterval || cacheTtl != vv.CacheTtl {
1254
			lruInterval = getWithDefault(vv.CacheLruInterval, 5)
1255
			cacheTtl = getWithDefault(vv.CacheTtl, 30)
1256

1257
			lruTimer = time.NewTicker(time.Duration(lruInterval) * time.Minute)
1258
			log.LogInfof("[startEvict] update vol config, dp(%d) %v ", dp.partitionID, *vv)
1259
		}
1260
	}
1261
}
1262

1263
func getWithDefault(base, def int) int {
1264
	if base <= 0 {
1265
		return def
1266
	}
1267

1268
	return base
1269
}
1270

1271
func (dp *DataPartition) StopDecommissionRecover(stop bool) {
1272
	// only work for decommission repair
1273
	if !dp.isDecommissionRecovering() {
1274
		log.LogWarnf("[StopDecommissionRecover]  dp(%d) is not in recovering status: type %d status %d",
1275
			dp.partitionID, dp.partitionType, dp.Status())
1276
		return
1277
	}
1278
	// for check timeout
1279
	dp.stopRecover = stop
1280
	dp.PersistMetadata()
1281
}
1282

1283
func (dp *DataPartition) isDecommissionRecovering() bool {
1284
	// decommission recover failed or success will set to normal
1285
	return dp.DataPartitionCreateType == proto.DecommissionedCreateDataPartition
1286
}
1287

1288
func (dp *DataPartition) handleDecommissionRecoverFailed() {
1289
	if !dp.isDecommissionRecovering() {
1290
		return
1291
	}
1292
	// prevent status changing from  Unavailable to Recovering again in statusUpdate()
1293
	dp.partitionType = proto.NormalCreateDataPartition
1294
	dp.partitionStatus = proto.Unavailable
1295
	log.LogWarnf("[handleDecommissionRecoverFailed]  dp(%d) recover failed reach max limit", dp.partitionID)
1296
	dp.PersistMetadata()
1297
	dp.StopDecommissionRecover(true)
1298
}
1299

1300
func (dp *DataPartition) incDiskErrCnt() {
1301
	diskErrCnt := atomic.AddUint64(&dp.diskErrCnt, 1)
1302
	log.LogWarnf("[incDiskErrCnt]: dp(%v) disk err count:%v", dp.partitionID, diskErrCnt)
1303
}
1304

1305
func (dp *DataPartition) getDiskErrCnt() uint64 {
1306
	return atomic.LoadUint64(&dp.diskErrCnt)
1307
}
1308

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.