1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
31
raftProto "github.com/cubefs/cubefs/depends/tiglabs/raft/proto"
32
"github.com/cubefs/cubefs/proto"
33
"github.com/cubefs/cubefs/raftstore"
34
"github.com/cubefs/cubefs/repl"
35
"github.com/cubefs/cubefs/storage"
36
"github.com/cubefs/cubefs/util"
37
"github.com/cubefs/cubefs/util/errors"
38
"github.com/cubefs/cubefs/util/log"
42
DataPartitionPrefix = "datapartition"
43
CachePartitionPrefix = "cachepartition"
44
PreLoadPartitionPrefix = "preloadpartition"
45
DataPartitionMetadataFileName = "META"
46
TempMetadataFileName = ".meta"
47
ApplyIndexFile = "APPLY"
48
TempApplyIndexFile = ".apply"
49
TimeLayout = "2006-01-02 15:04:05"
57
type DataPartitionMetadata struct {
65
DataPartitionCreateType int
69
VerList []*proto.VolVersionInfo
73
func (md *DataPartitionMetadata) Validate() (err error) {
74
md.VolumeID = strings.TrimSpace(md.VolumeID)
75
if len(md.VolumeID) == 0 || md.PartitionID == 0 || md.PartitionSize == 0 {
76
err = errors.New("illegal data partition metadata")
82
// MetaMultiSnapshotInfo
83
type MetaMultiSnapshotInfo struct {
89
type DataPartition struct {
97
replicas []string // addresses of the replicas
98
replicasLock sync.RWMutex
106
extentStore *storage.ExtentStore
107
raftPartition raftstore.Partition
108
config *dataPartitionCfg
109
appliedID uint64 // apply id used in Raft
110
lastTruncateID uint64 // truncate id used in Raft
111
metaAppliedID uint64 // apply id while do meta persist
116
stopRaftC chan uint64
122
intervalToUpdateReplicas int64 // interval to ask the master for updating the replica information
123
snapshot []*proto.File
124
snapshotMutex sync.RWMutex
125
intervalToUpdatePartitionSize int64
126
loadExtentHeaderStatus int
127
DataPartitionCreateType int
128
isLoadingDataPartition int32
129
persistMetaMutex sync.RWMutex
134
verSeqCommitStatus int8
135
volVersionInfoList *proto.VolVersionInfoList
136
decommissionRepairProgress float64 // record repair progress for decommission datapartition
138
recoverErrCnt uint64 // donot reset, if reach max err cnt, delete this dp
140
diskErrCnt uint64 // number of disk io errors while reading or writing
143
func (dp *DataPartition) IsForbidden() bool {
144
return dp.config.Forbidden
147
func (dp *DataPartition) SetForbidden(status bool) {
148
dp.config.Forbidden = status
151
func CreateDataPartition(dpCfg *dataPartitionCfg, disk *Disk, request *proto.CreateDataPartitionRequest) (dp *DataPartition, err error) {
152
if dp, err = newDataPartition(dpCfg, disk, true); err != nil {
156
if request.CreateType == proto.NormalCreateDataPartition {
157
err = dp.StartRaft(false)
159
// init leaderSize to partitionSize
160
disk.updateDisk(uint64(request.LeaderSize))
161
// ensure heartbeat report Recovering
162
dp.partitionStatus = proto.Recovering
163
go dp.StartRaftAfterRepair(false)
169
// persist file metadata
170
go dp.StartRaftLoggingSchedule()
171
dp.DataPartitionCreateType = request.CreateType
172
dp.replicaNum = request.ReplicaNum
173
err = dp.PersistMetadata()
174
disk.AddSize(uint64(dp.Size()))
178
func (dp *DataPartition) IsEquareCreateDataPartitionRequst(request *proto.CreateDataPartitionRequest) (err error) {
179
if len(dp.config.Peers) != len(request.Members) {
180
return fmt.Errorf("exist partition(%v) peers len(%v) members len(%v)",
181
dp.partitionID, len(dp.config.Peers), len(request.Members))
183
for index, host := range dp.config.Hosts {
184
requestHost := request.Hosts[index]
185
if host != requestHost {
186
return fmt.Errorf("exist partition(%v) index(%v) requestHost(%v) persistHost(%v)",
187
dp.partitionID, index, requestHost, host)
190
for index, peer := range dp.config.Peers {
191
requestPeer := request.Members[index]
192
if requestPeer.ID != peer.ID || requestPeer.Addr != peer.Addr {
193
return fmt.Errorf("exist partition(%v) index(%v) requestPeer(%v) persistPeers(%v)",
194
dp.partitionID, index, requestPeer, peer)
197
if dp.config.VolName != request.VolumeId {
198
return fmt.Errorf("exist partition Partition(%v) requestVolName(%v) persistVolName(%v)",
199
dp.partitionID, request.VolumeId, dp.config.VolName)
205
func (dp *DataPartition) ForceSetDataPartitionToLoadding() {
206
atomic.StoreInt32(&dp.isLoadingDataPartition, 1)
209
func (dp *DataPartition) ForceSetDataPartitionToFininshLoad() {
210
atomic.StoreInt32(&dp.isLoadingDataPartition, 0)
213
func (dp *DataPartition) IsDataPartitionLoading() bool {
214
return atomic.LoadInt32(&dp.isLoadingDataPartition) == 1
217
func (dp *DataPartition) ForceSetRaftRunning() {
218
atomic.StoreInt32(&dp.raftStatus, RaftStatusRunning)
221
// LoadDataPartition loads and returns a partition instance based on the specified directory.
222
// It reads the partition metadata file stored under the specified directory
223
// and creates the partition instance.
224
func LoadDataPartition(partitionDir string, disk *Disk) (dp *DataPartition, err error) {
225
var metaFileData []byte
226
if metaFileData, err = os.ReadFile(path.Join(partitionDir, DataPartitionMetadataFileName)); err != nil {
229
meta := &DataPartitionMetadata{}
230
if err = json.Unmarshal(metaFileData, meta); err != nil {
233
if err = meta.Validate(); err != nil {
237
dpCfg := &dataPartitionCfg{
238
VolName: meta.VolumeID,
239
PartitionSize: meta.PartitionSize,
240
PartitionType: meta.PartitionType,
241
PartitionID: meta.PartitionID,
242
ReplicaNum: meta.ReplicaNum,
245
RaftStore: disk.space.GetRaftStore(),
246
NodeID: disk.space.GetNodeID(),
247
ClusterID: disk.space.GetClusterID(),
249
if dp, err = newDataPartition(dpCfg, disk, false); err != nil {
252
dp.stopRecover = meta.StopRecover
253
dp.metaAppliedID = meta.ApplyID
255
dp.ForceSetDataPartitionToLoadding()
256
disk.space.AttachPartition(dp)
257
if err = dp.LoadAppliedID(); err != nil {
258
log.LogErrorf("action[loadApplyIndex] %v", err)
261
log.LogInfof("Action(LoadDataPartition) PartitionID(%v) meta(%v) stopRecover(%v)", dp.partitionID, meta, meta.StopRecover)
262
dp.DataPartitionCreateType = meta.DataPartitionCreateType
263
dp.lastTruncateID = meta.LastTruncateID
264
if meta.DataPartitionCreateType == proto.NormalCreateDataPartition {
265
err = dp.StartRaft(true)
267
// init leaderSize to partitionSize
268
dp.leaderSize = dp.partitionSize
269
dp.partitionStatus = proto.Recovering
270
go dp.StartRaftAfterRepair(true)
273
log.LogErrorf("PartitionID(%v) start raft err(%v)..", dp.partitionID, err)
274
disk.space.DetachDataPartition(dp.partitionID)
278
go dp.StartRaftLoggingSchedule()
279
disk.AddSize(uint64(dp.Size()))
284
func newDataPartition(dpCfg *dataPartitionCfg, disk *Disk, isCreate bool) (dp *DataPartition, err error) {
285
partitionID := dpCfg.PartitionID
288
if proto.IsNormalDp(dpCfg.PartitionType) {
289
dataPath = path.Join(disk.Path, fmt.Sprintf(DataPartitionPrefix+"_%v_%v", partitionID, dpCfg.PartitionSize))
290
} else if proto.IsCacheDp(dpCfg.PartitionType) {
291
dataPath = path.Join(disk.Path, fmt.Sprintf(CachePartitionPrefix+"_%v_%v", partitionID, dpCfg.PartitionSize))
292
} else if proto.IsPreLoadDp(dpCfg.PartitionType) {
293
dataPath = path.Join(disk.Path, fmt.Sprintf(PreLoadPartitionPrefix+"_%v_%v", partitionID, dpCfg.PartitionSize))
295
return nil, fmt.Errorf("newDataPartition fail, dataPartitionCfg(%v)", dpCfg)
298
partition := &DataPartition{
299
volumeID: dpCfg.VolName,
300
clusterID: dpCfg.ClusterID,
301
partitionID: partitionID,
302
replicaNum: dpCfg.ReplicaNum,
304
dataNode: disk.dataNode,
306
partitionSize: dpCfg.PartitionSize,
307
partitionType: dpCfg.PartitionType,
308
replicas: make([]string, 0),
309
stopC: make(chan bool),
310
stopRaftC: make(chan uint64),
311
storeC: make(chan uint64, 128),
312
snapshot: make([]*proto.File, 0),
313
partitionStatus: proto.ReadWrite,
315
raftStatus: RaftStatusStopped,
316
verSeq: dpCfg.VerSeq,
317
DataPartitionCreateType: dpCfg.CreateType,
318
volVersionInfoList: &proto.VolVersionInfoList{},
320
atomic.StoreUint64(&partition.recoverErrCnt, 0)
321
log.LogInfof("action[newDataPartition] dp %v replica num %v", partitionID, dpCfg.ReplicaNum)
322
partition.replicasInit()
323
partition.extentStore, err = storage.NewExtentStore(partition.path, dpCfg.PartitionID, dpCfg.PartitionSize,
324
partition.partitionType, isCreate)
326
log.LogWarnf("action[newDataPartition] dp %v NewExtentStore failed %v", partitionID, err.Error())
330
if err = partition.storeAppliedID(partition.appliedID); err != nil {
331
log.LogErrorf("action[newDataPartition] dp %v initial Apply [%v] failed: %v",
332
partition.partitionID, partition.appliedID, err)
335
disk.AttachDataPartition(partition)
337
go partition.statusUpdateScheduler()
338
go partition.startEvict()
340
if err = dp.getVerListFromMaster(); err != nil {
341
log.LogErrorf("action[newDataPartition] vol %v dp %v loadFromMaster verList failed err %v", dp.volumeID, dp.partitionID, err)
346
log.LogInfof("action[newDataPartition] dp %v replica num %v CreateType %v create success",
347
dp.partitionID, dpCfg.ReplicaNum, dp.DataPartitionCreateType)
351
func (partition *DataPartition) HandleVersionOp(req *proto.MultiVersionOpRequest) (err error) {
356
if verData, err = json.Marshal(req); err != nil {
359
pItem = &RaftCmdItem{
360
Op: uint32(proto.OpVersionOp),
361
K: []byte("version"),
364
data, _ := MarshalRaftCmd(pItem)
365
_, err = partition.Submit(data)
369
func (partition *DataPartition) fsmVersionOp(opItem *RaftCmdItem) (err error) {
370
req := new(proto.MultiVersionOpRequest)
371
if err = json.Unmarshal(opItem.V, req); err != nil {
372
log.LogErrorf("action[fsmVersionOp] dp[%v] op item %v", partition.partitionID, opItem)
375
if len(req.VolVerList) == 0 {
378
lastSeq := req.VolVerList[len(req.VolVerList)-1].Ver
379
partition.volVersionInfoList.RWLock.Lock()
380
if len(partition.volVersionInfoList.VerList) == 0 {
381
partition.volVersionInfoList.VerList = make([]*proto.VolVersionInfo, len(req.VolVerList))
382
copy(partition.volVersionInfoList.VerList, req.VolVerList)
383
partition.verSeq = lastSeq
384
log.LogInfof("action[fsmVersionOp] dp %v seq %v updateVerList reqeust ver %v verlist %v dp verlist nil and set",
385
partition.partitionID, partition.verSeq, lastSeq, req.VolVerList)
386
partition.volVersionInfoList.RWLock.Unlock()
390
lastVerInfo := partition.volVersionInfoList.GetLastVolVerInfo()
391
log.LogInfof("action[fsmVersionOp] dp %v seq %v lastVerList seq %v req seq %v op %v",
392
partition.partitionID, partition.verSeq, lastVerInfo.Ver, lastSeq, req.Op)
394
if lastVerInfo.Ver >= lastSeq {
395
if lastVerInfo.Ver == lastSeq {
396
if req.Op == proto.CreateVersionCommit {
397
lastVerInfo.Status = proto.VersionNormal
400
partition.volVersionInfoList.RWLock.Unlock()
404
var status uint8 = proto.VersionPrepare
405
if req.Op == proto.CreateVersionCommit {
406
status = proto.VersionNormal
408
partition.volVersionInfoList.VerList = append(partition.volVersionInfoList.VerList, &proto.VolVersionInfo{
413
partition.verSeq = lastSeq
415
err = partition.PersistMetadata()
416
log.LogInfof("action[fsmVersionOp] dp %v seq %v updateVerList reqeust add new seq %v verlist (%v) err (%v)",
417
partition.partitionID, partition.verSeq, lastSeq, partition.volVersionInfoList, err)
419
partition.volVersionInfoList.RWLock.Unlock()
423
func (dp *DataPartition) getVerListFromMaster() (err error) {
424
var verList *proto.VolVersionInfoList
425
verList, err = MasterClient.AdminAPI().GetVerList(dp.volumeID)
427
log.LogErrorf("action[onStart] GetVerList err[%v]", err)
431
for _, info := range verList.VerList {
432
if info.Status != proto.VersionNormal {
435
dp.volVersionInfoList.VerList = append(dp.volVersionInfoList.VerList, info)
438
log.LogDebugf("action[onStart] dp %v verList %v", dp.partitionID, dp.volVersionInfoList.VerList)
439
dp.verSeq = dp.volVersionInfoList.GetLastVer()
443
func (dp *DataPartition) replicasInit() {
444
replicas := make([]string, 0)
445
if dp.config.Hosts == nil {
448
replicas = append(replicas, dp.config.Hosts...)
449
dp.replicasLock.Lock()
450
dp.replicas = replicas
451
dp.replicasLock.Unlock()
452
if dp.config.Hosts != nil && len(dp.config.Hosts) >= 1 {
453
leaderAddr := strings.Split(dp.config.Hosts[0], ":")
454
if len(leaderAddr) == 2 && strings.TrimSpace(leaderAddr[0]) == LocalIP {
460
func (dp *DataPartition) GetExtentCount() int {
461
return dp.extentStore.GetExtentCount()
464
func (dp *DataPartition) Path() string {
468
// IsRaftLeader tells if the given address belongs to the raft leader.
469
func (dp *DataPartition) IsRaftLeader() (addr string, ok bool) {
470
if dp.raftStopped() {
473
leaderID, _ := dp.raftPartition.LeaderTerm()
477
ok = leaderID == dp.config.NodeID
478
for _, peer := range dp.config.Peers {
479
if leaderID == peer.ID {
487
func (dp *DataPartition) Replicas() []string {
488
dp.replicasLock.RLock()
489
defer dp.replicasLock.RUnlock()
493
func (dp *DataPartition) getReplicaCopy() []string {
494
dp.replicasLock.RLock()
495
defer dp.replicasLock.RUnlock()
497
tmpCopy := make([]string, len(dp.replicas))
498
copy(tmpCopy, dp.replicas)
503
func (dp *DataPartition) getReplicaAddr(index int) string {
504
dp.replicasLock.RLock()
505
defer dp.replicasLock.RUnlock()
506
return dp.replicas[index]
509
func (dp *DataPartition) getReplicaLen() int {
510
dp.replicasLock.RLock()
511
defer dp.replicasLock.RUnlock()
512
return len(dp.replicas)
515
func (dp *DataPartition) IsExistReplica(addr string) bool {
516
dp.replicasLock.RLock()
517
defer dp.replicasLock.RUnlock()
518
for _, host := range dp.replicas {
526
func (dp *DataPartition) ReloadSnapshot() {
527
files, err := dp.extentStore.SnapShot()
529
log.LogErrorf("ReloadSnapshot err %v", err)
533
dp.snapshotMutex.Lock()
534
for _, f := range dp.snapshot {
535
storage.PutSnapShotFileToPool(f)
538
dp.snapshotMutex.Unlock()
541
// Snapshot returns the snapshot of the data partition.
542
func (dp *DataPartition) SnapShot() (files []*proto.File) {
543
dp.snapshotMutex.RLock()
544
defer dp.snapshotMutex.RUnlock()
549
// Stop close the store and the raft store.
550
func (dp *DataPartition) Stop() {
551
dp.stopOnce.Do(func() {
555
// Close the store and raftstore.
557
dp.extentStore.Close()
558
err := dp.storeAppliedID(atomic.LoadUint64(&dp.appliedID))
560
log.LogErrorf("action[Stop]: failed to store applied index")
565
// Disk returns the disk instance.
566
func (dp *DataPartition) Disk() *Disk {
570
// func (dp *DataPartition) IsRejectWrite() bool {
571
// return dp.Disk().RejectWrite
574
// Status returns the partition status.
575
func (dp *DataPartition) Status() int {
576
return dp.partitionStatus
579
// Size returns the partition size.
580
func (dp *DataPartition) Size() int {
581
return dp.partitionSize
584
// Used returns the used space.
585
func (dp *DataPartition) Used() int {
589
// Available returns the available space.
590
func (dp *DataPartition) Available() int {
591
return dp.partitionSize - dp.used
594
func (dp *DataPartition) ForceLoadHeader() {
595
dp.loadExtentHeaderStatus = FinishLoadDataPartitionExtentHeader
598
// PersistMetadata persists the file metadata on the disk.
599
func (dp *DataPartition) PersistMetadata() (err error) {
600
dp.persistMetaMutex.Lock()
601
defer dp.persistMetaMutex.Unlock()
604
metadataFile *os.File
607
fileName := path.Join(dp.Path(), TempMetadataFileName)
608
if metadataFile, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0o666); err != nil {
617
md := &DataPartitionMetadata{
618
VolumeID: dp.config.VolName,
619
PartitionID: dp.config.PartitionID,
620
ReplicaNum: dp.config.ReplicaNum,
621
PartitionSize: dp.config.PartitionSize,
622
PartitionType: dp.config.PartitionType,
623
Peers: dp.config.Peers,
624
Hosts: dp.config.Hosts,
625
DataPartitionCreateType: dp.DataPartitionCreateType,
626
CreateTime: time.Now().Format(TimeLayout),
627
LastTruncateID: dp.lastTruncateID,
628
StopRecover: dp.stopRecover,
629
VerList: dp.volVersionInfoList.VerList,
630
ApplyID: dp.appliedID,
633
if metaData, err = json.Marshal(md); err != nil {
636
if _, err = metadataFile.Write(metaData); err != nil {
639
dp.metaAppliedID = dp.appliedID
640
log.LogInfof("PersistMetadata DataPartition(%v) data(%v)", dp.partitionID, string(metaData))
641
err = os.Rename(fileName, path.Join(dp.Path(), DataPartitionMetadataFileName))
645
func (dp *DataPartition) statusUpdateScheduler() {
646
ticker := time.NewTicker(time.Minute)
647
snapshotTicker := time.NewTicker(time.Minute * 5)
653
// only repair tiny extent
654
if !dp.isNormalType() {
655
dp.LaunchRepair(proto.TinyExtentType)
660
if index >= math.MaxUint32 {
665
dp.LaunchRepair(proto.TinyExtentType)
667
dp.LaunchRepair(proto.NormalExtentType)
669
case <-snapshotTicker.C:
673
snapshotTicker.Stop()
679
func (dp *DataPartition) statusUpdate() {
680
status := proto.ReadWrite
683
if dp.used >= dp.partitionSize {
684
status = proto.ReadOnly
686
if dp.isNormalType() && dp.extentStore.GetExtentCount() >= storage.MaxExtentCount {
687
status = proto.ReadOnly
689
if dp.isNormalType() && dp.raftStatus == RaftStatusStopped {
690
// dp is still recovering
691
if dp.DataPartitionCreateType == proto.DecommissionedCreateDataPartition {
692
status = proto.Recovering
694
status = proto.Unavailable
697
if dp.getDiskErrCnt() > 0 {
698
dp.partitionStatus = proto.Unavailable
701
log.LogInfof("action[statusUpdate] dp %v raft status %v dp.status %v, status %v, disk status %v",
702
dp.partitionID, dp.raftStatus, dp.Status(), status, float64(dp.disk.Status))
703
// dp.partitionStatus = int(math.Min(float64(status), float64(dp.disk.Status)))
704
dp.partitionStatus = status
707
func (dp *DataPartition) computeUsage() {
708
if time.Now().Unix()-dp.intervalToUpdatePartitionSize < IntervalToUpdatePartitionSize {
711
dp.used = int(dp.ExtentStore().GetStoreUsedSize())
712
dp.intervalToUpdatePartitionSize = time.Now().Unix()
715
func (dp *DataPartition) ExtentStore() *storage.ExtentStore {
716
return dp.extentStore
719
func (dp *DataPartition) checkIsDiskError(err error, rwFlag uint8) {
723
log.LogWarnf("checkIsDiskError: disk path %v, error: %v, partition:%v, rwFlag:%v",
724
dp.Path(), err.Error(), dp.partitionID, rwFlag)
725
if !IsDiskErr(err.Error()) {
731
dp.disk.triggerDiskError(rwFlag, dp.partitionID)
733
// must after change disk.status
738
func newRaftApplyError(err error) error {
739
return errors.NewErrorf("[Custom Error]: unhandled raft apply error, err(%s)", err)
742
func isRaftApplyError(errMsg string) bool {
743
return strings.Contains(errMsg, "[Custom Error]: unhandled raft apply error")
746
// String returns the string format of the data partition information.
747
func (dp *DataPartition) String() (m string) {
748
return fmt.Sprintf(DataPartitionPrefix+"_%v_%v", dp.partitionID, dp.partitionSize)
751
// LaunchRepair launches the repair of extents.
752
func (dp *DataPartition) LaunchRepair(extentType uint8) {
753
if dp.partitionStatus == proto.Unavailable {
756
if err := dp.updateReplicas(false); err != nil {
757
log.LogErrorf("action[LaunchRepair] partition(%v) err(%v).", dp.partitionID, err)
763
if dp.extentStore.BrokenTinyExtentCnt() == 0 {
764
dp.extentStore.MoveAllToBrokenTinyExtentC(MinTinyExtentsToRepair)
766
dp.repair(extentType)
769
func (dp *DataPartition) updateReplicas(isForce bool) (err error) {
770
if !isForce && time.Now().Unix()-dp.intervalToUpdateReplicas <= IntervalToUpdateReplica {
774
isLeader, replicas, err := dp.fetchReplicasFromMaster()
778
dp.replicasLock.Lock()
779
defer dp.replicasLock.Unlock()
780
if !dp.compareReplicas(dp.replicas, replicas) {
781
log.LogInfof("action[updateReplicas] partition(%v) replicas changed from (%v) to (%v).",
782
dp.partitionID, dp.replicas, replicas)
784
dp.isLeader = isLeader
785
dp.replicas = replicas
786
dp.intervalToUpdateReplicas = time.Now().Unix()
787
log.LogInfof(fmt.Sprintf("ActionUpdateReplicationHosts partiton(%v), force(%v)", dp.partitionID, isForce))
792
// Compare the fetched replica with the local one.
793
func (dp *DataPartition) compareReplicas(v1, v2 []string) (equals bool) {
794
if len(v1) == len(v2) {
795
for i := 0; i < len(v1); i++ {
805
// Fetch the replica information from the master.
806
func (dp *DataPartition) fetchReplicasFromMaster() (isLeader bool, replicas []string, err error) {
807
var partition *proto.DataPartitionInfo
810
if partition, err = MasterClient.AdminAPI().GetDataPartition(dp.volumeID, dp.partitionID); err != nil {
819
time.Sleep(10 * time.Second)
822
replicas = append(replicas, partition.Hosts...)
823
if partition.Hosts != nil && len(partition.Hosts) >= 1 {
824
leaderAddr := strings.Split(partition.Hosts[0], ":")
825
if len(leaderAddr) == 2 && strings.TrimSpace(leaderAddr[0]) == LocalIP {
832
func (dp *DataPartition) Load() (response *proto.LoadDataPartitionResponse) {
833
response = &proto.LoadDataPartitionResponse{}
834
response.PartitionId = uint64(dp.partitionID)
835
response.PartitionStatus = dp.partitionStatus
836
response.Used = uint64(dp.Used())
839
if dp.loadExtentHeaderStatus != FinishLoadDataPartitionExtentHeader {
840
response.PartitionSnapshot = make([]*proto.File, 0)
842
response.PartitionSnapshot = dp.SnapShot()
845
response.Status = proto.TaskFailed
846
response.Result = err.Error()
852
// DoExtentStoreRepair performs the repairs of the extent store.
853
// 1. when the extent size is smaller than the max size on the record, start to repair the missing part.
854
// 2. if the extent does not even exist, create the extent first, and then repair.
855
func (dp *DataPartition) DoExtentStoreRepair(repairTask *DataPartitionRepairTask) {
856
if dp.stopRecover && dp.isDecommissionRecovering() {
857
log.LogWarnf("DoExtentStoreRepair %v receive stop signal", dp.partitionID)
860
store := dp.extentStore
861
log.LogDebugf("DoExtentStoreRepair.dp %v len extents %v", dp.partitionID, len(repairTask.ExtentsToBeCreated))
862
for _, extentInfo := range repairTask.ExtentsToBeCreated {
863
log.LogDebugf("DoExtentStoreRepair.dp %v len extentInfo %v", dp.partitionID, extentInfo)
864
if storage.IsTinyExtent(extentInfo.FileID) {
867
if store.HasExtent(uint64(extentInfo.FileID)) {
870
if !AutoRepairStatus {
871
log.LogWarnf("AutoRepairStatus is False,so cannot Create extent(%v)", extentInfo.String())
875
dp.disk.allocCheckLimit(proto.IopsWriteType, 1)
877
err := store.Create(uint64(extentInfo.FileID))
887
wg = new(sync.WaitGroup)
888
for _, extentInfo := range repairTask.ExtentsToBeRepaired {
889
if dp.stopRecover && dp.isDecommissionRecovering() {
890
log.LogWarnf("DoExtentStoreRepair %v receive stop signal", dp.partitionID)
893
if !store.HasExtent(uint64(extentInfo.FileID)) {
898
// repair the extents
899
go dp.doStreamExtentFixRepair(wg, extentInfo)
902
if recoverIndex%NumOfFilesToRecoverInParallel == 0 {
907
dp.doStreamFixTinyDeleteRecord(repairTask)
910
func (dp *DataPartition) pushSyncDeleteRecordFromLeaderMesg() bool {
912
case dp.Disk().syncTinyDeleteRecordFromLeaderOnEveryDisk <- true:
919
func (dp *DataPartition) consumeTinyDeleteRecordFromLeaderMesg() {
921
case <-dp.Disk().syncTinyDeleteRecordFromLeaderOnEveryDisk:
928
func (dp *DataPartition) doStreamFixTinyDeleteRecord(repairTask *DataPartitionRepairTask) {
930
localTinyDeleteFileSize int64
934
if !dp.pushSyncDeleteRecordFromLeaderMesg() {
939
dp.consumeTinyDeleteRecordFromLeaderMesg()
941
if localTinyDeleteFileSize, err = dp.extentStore.LoadTinyDeleteFileOffset(); err != nil {
945
log.LogInfof(ActionSyncTinyDeleteRecord+" start PartitionID(%v) localTinyDeleteFileSize(%v) leaderTinyDeleteFileSize(%v) leaderAddr(%v)",
946
dp.partitionID, localTinyDeleteFileSize, repairTask.LeaderTinyDeleteRecordFileSize, repairTask.LeaderAddr)
948
if localTinyDeleteFileSize >= repairTask.LeaderTinyDeleteRecordFileSize {
952
if repairTask.LeaderTinyDeleteRecordFileSize-localTinyDeleteFileSize < MinTinyExtentDeleteRecordSyncSize {
957
log.LogInfof(ActionSyncTinyDeleteRecord+" end PartitionID(%v) localTinyDeleteFileSize(%v) leaderTinyDeleteFileSize(%v) leaderAddr(%v) err(%v)",
958
dp.partitionID, localTinyDeleteFileSize, repairTask.LeaderTinyDeleteRecordFileSize, repairTask.LeaderAddr, err)
961
p := repl.NewPacketToReadTinyDeleteRecord(dp.partitionID, localTinyDeleteFileSize)
962
if conn, err = dp.getRepairConn(repairTask.LeaderAddr); err != nil {
966
dp.putRepairConn(conn, err != nil)
968
if err = p.WriteToConn(conn); err != nil {
971
store := dp.extentStore
972
start := time.Now().Unix()
973
for localTinyDeleteFileSize < repairTask.LeaderTinyDeleteRecordFileSize {
974
if dp.stopRecover && dp.isDecommissionRecovering() {
975
log.LogWarnf("doStreamFixTinyDeleteRecord %v receive stop signal", dp.partitionID)
978
if localTinyDeleteFileSize >= repairTask.LeaderTinyDeleteRecordFileSize {
981
if err = p.ReadFromConnWithVer(conn, proto.ReadDeadlineTime); err != nil {
985
logContent := fmt.Sprintf("action[doStreamFixTinyDeleteRecord] %v.",
986
p.LogMessage(p.GetOpMsg(), conn.RemoteAddr().String(), start, fmt.Errorf(string(p.Data[:p.Size]))))
987
err = fmt.Errorf(logContent)
990
if p.CRC != crc32.ChecksumIEEE(p.Data[:p.Size]) {
991
err = fmt.Errorf("crc not match")
994
if p.Size%storage.DeleteTinyRecordSize != 0 {
995
err = fmt.Errorf("unavali size")
999
for (index+1)*storage.DeleteTinyRecordSize <= int(p.Size) {
1000
record := p.Data[index*storage.DeleteTinyRecordSize : (index+1)*storage.DeleteTinyRecordSize]
1001
extentID, offset, size := storage.UnMarshalTinyExtent(record)
1002
localTinyDeleteFileSize += storage.DeleteTinyRecordSize
1004
if !storage.IsTinyExtent(extentID) {
1008
dp.disk.allocCheckLimit(proto.IopsWriteType, 1)
1009
// log.LogInfof("doStreamFixTinyDeleteRecord Delete PartitionID(%v)_Extent(%v)_Offset(%v)_Size(%v)", dp.partitionID, extentID, offset, size)
1010
store.MarkDelete(extentID, int64(offset), int64(size))
1015
// ChangeRaftMember is a wrapper function of changing the raft member.
1016
func (dp *DataPartition) ChangeRaftMember(changeType raftProto.ConfChangeType, peer raftProto.Peer, context []byte) (resp interface{}, err error) {
1017
resp, err = dp.raftPartition.ChangeMember(changeType, peer, context)
1021
func (dp *DataPartition) canRemoveSelf() (canRemove bool, err error) {
1022
var partition *proto.DataPartitionInfo
1025
if partition, err = MasterClient.AdminAPI().GetDataPartition(dp.volumeID, dp.partitionID); err != nil {
1026
log.LogErrorf("action[canRemoveSelf] err[%v]", err)
1034
time.Sleep(10 * time.Second)
1038
var existInPeers bool
1039
for _, peer := range partition.Peers {
1040
if dp.config.NodeID == peer.ID {
1048
if dp.config.NodeID == partition.OfflinePeerID {
1055
func (dp *DataPartition) getRepairConn(target string) (net.Conn, error) {
1056
return dp.dataNode.getRepairConnFunc(target)
1059
func (dp *DataPartition) putRepairConn(conn net.Conn, forceClose bool) {
1060
log.LogDebugf("action[putRepairConn], forceClose: %v", forceClose)
1061
dp.dataNode.putRepairConnFunc(conn, forceClose)
1064
func (dp *DataPartition) isNormalType() bool {
1065
return proto.IsNormalDp(dp.partitionType)
1068
type SimpleVolView struct {
1069
vv *proto.SimpleVolView
1070
lastUpdateTime time.Time
1075
volMap map[string]*SimpleVolView
1078
var volViews = VolMap{
1079
Mutex: sync.Mutex{},
1080
volMap: make(map[string]*SimpleVolView),
1083
func (vo *VolMap) getSimpleVolView(VolumeID string) (vv *proto.SimpleVolView, err error) {
1085
if volView, ok := vo.volMap[VolumeID]; ok && time.Since(volView.lastUpdateTime) < 5*time.Minute {
1087
return volView.vv, nil
1091
volView := &SimpleVolView{
1093
lastUpdateTime: time.Time{},
1096
if vv, err = MasterClient.AdminAPI().GetVolumeSimpleInfo(VolumeID); err != nil {
1097
log.LogErrorf("action[GetVolumeSimpleInfo] cannot get vol(%v) from master(%v) err(%v).",
1098
VolumeID, MasterClient.Leader(), err)
1102
log.LogDebugf("get volume info, vol(%s), vol(%v)", vv.Name, volView)
1105
volView.lastUpdateTime = time.Now()
1108
vo.volMap[VolumeID] = volView
1114
func (dp *DataPartition) doExtentTtl(ttl int) {
1116
log.LogWarn("[doTTL] ttl is 0, set default 30", ttl)
1120
extents := dp.extentStore.DumpExtents()
1122
for _, ext := range extents {
1123
if storage.IsTinyExtent(ext.FileID) {
1127
if time.Now().Unix()-ext.AccessTime > int64(ttl)*util.OneDaySec() {
1128
log.LogDebugf("action[doExtentTtl] ttl delete dp(%v) extent(%v).", dp.partitionID, ext)
1129
dp.extentStore.MarkDelete(ext.FileID, 0, 0)
1134
func (dp *DataPartition) doExtentEvict(vv *proto.SimpleVolView) {
1142
if vv.CacheHighWater < vv.CacheLowWater || vv.CacheLowWater < 0 || vv.CacheHighWater > 100 {
1143
log.LogErrorf("action[doExtentEvict] invalid policy dp(%v), CacheHighWater(%v) CacheLowWater(%v).",
1144
dp.partitionID, vv.CacheHighWater, vv.CacheLowWater)
1148
// if dp use age larger than the space high water, do die out.
1150
if dp.Used()*100/dp.Size() > vv.CacheHighWater {
1152
freeSpace = dp.Used() - dp.Size()*vv.CacheLowWater/100
1153
} else if dp.partitionStatus == proto.ReadOnly {
1155
freeSpace = dp.Used() * (vv.CacheHighWater - vv.CacheLowWater) / 100
1158
// if dp extent count larger than upper count, do die out.
1160
extInfos := dp.extentStore.DumpExtents()
1161
maxExtentCount := dp.Size() / util.DefaultTinySizeLimit
1162
if len(extInfos) > maxExtentCount {
1164
freeExtentCount = len(extInfos) - vv.CacheLowWater*maxExtentCount/100
1167
log.LogDebugf("action[doExtentEvict], vol %v, LRU(%v, %v), dp %v, usage %v, status(%d), extents %v, freeSpace %v, freeExtentCount %v, needDieOut %v",
1168
vv.Name, vv.CacheLowWater, vv.CacheHighWater, dp.partitionID, dp.Used()*100/dp.Size(), dp.partitionStatus, len(extInfos),
1169
freeSpace, freeExtentCount, needDieOut)
1177
for _, ext := range extInfos {
1178
if storage.IsTinyExtent(ext.FileID) {
1182
freeSpace -= int(ext.Size)
1184
dp.extentStore.MarkDelete(ext.FileID, 0, 0)
1185
log.LogDebugf("action[doExtentEvict] die out. vol %v, dp(%v), extent(%v).", vv.Name, dp.partitionID, *ext)
1187
if freeSpace <= 0 && freeExtentCount <= 0 {
1188
log.LogDebugf("[doExtentEvict] die out done, vol(%s), dp (%d)", vv.Name, dp.partitionID)
1194
func (dp *DataPartition) startEvict() {
1195
// only cache or preload dp can't do evict.
1196
if !proto.IsCacheDp(dp.partitionType) {
1200
log.LogDebugf("[startEvict] start do dp(%d) evict op", dp.partitionID)
1202
vv, err := volViews.getSimpleVolView(dp.volumeID)
1204
err := fmt.Errorf("[startEvict] get vol [%s] info error, err %s", dp.volumeID, err.Error())
1209
lruInterval := getWithDefault(vv.CacheLruInterval, 5)
1210
cacheTtl := getWithDefault(vv.CacheTtl, 30)
1212
lruTimer := time.NewTicker(time.Duration(lruInterval) * time.Minute)
1213
ttlTimer := time.NewTicker(time.Duration(util.OneDaySec()) * time.Second)
1220
// check volume type and dp type.
1221
if proto.IsHot(vv.VolType) || !proto.IsCacheDp(dp.partitionType) {
1222
log.LogErrorf("action[startEvict] cannot startEvict, vol(%v), dp(%v).", vv.Name, dp.partitionID)
1228
log.LogDebugf("start [doExtentEvict] vol(%s), dp(%d).", vv.Name, dp.partitionID)
1229
evictStart := time.Now()
1230
dp.doExtentEvict(vv)
1231
log.LogDebugf("action[doExtentEvict] vol(%v), dp(%v), cost (%v)ms, .", vv.Name, dp.partitionID, time.Since(evictStart))
1234
log.LogDebugf("start [doExtentTtl] vol(%s), dp(%d).", vv.Name, dp.partitionID)
1235
ttlStart := time.Now()
1236
dp.doExtentTtl(cacheTtl)
1237
log.LogDebugf("action[doExtentTtl] vol(%v), dp(%v), cost (%v)ms.", vv.Name, dp.partitionID, time.Since(ttlStart))
1240
log.LogWarn("task[doExtentTtl] stopped", dp.volumeID, dp.partitionID)
1244
// loop update vol info
1245
newVV, err := volViews.getSimpleVolView(dp.volumeID)
1247
err := fmt.Errorf("[startEvict] get vol [%s] info error, err %s", dp.volumeID, err.Error())
1253
if lruInterval != vv.CacheLruInterval || cacheTtl != vv.CacheTtl {
1254
lruInterval = getWithDefault(vv.CacheLruInterval, 5)
1255
cacheTtl = getWithDefault(vv.CacheTtl, 30)
1257
lruTimer = time.NewTicker(time.Duration(lruInterval) * time.Minute)
1258
log.LogInfof("[startEvict] update vol config, dp(%d) %v ", dp.partitionID, *vv)
1263
func getWithDefault(base, def int) int {
1271
func (dp *DataPartition) StopDecommissionRecover(stop bool) {
1272
// only work for decommission repair
1273
if !dp.isDecommissionRecovering() {
1274
log.LogWarnf("[StopDecommissionRecover] dp(%d) is not in recovering status: type %d status %d",
1275
dp.partitionID, dp.partitionType, dp.Status())
1278
// for check timeout
1279
dp.stopRecover = stop
1280
dp.PersistMetadata()
1283
func (dp *DataPartition) isDecommissionRecovering() bool {
1284
// decommission recover failed or success will set to normal
1285
return dp.DataPartitionCreateType == proto.DecommissionedCreateDataPartition
1288
func (dp *DataPartition) handleDecommissionRecoverFailed() {
1289
if !dp.isDecommissionRecovering() {
1292
// prevent status changing from Unavailable to Recovering again in statusUpdate()
1293
dp.partitionType = proto.NormalCreateDataPartition
1294
dp.partitionStatus = proto.Unavailable
1295
log.LogWarnf("[handleDecommissionRecoverFailed] dp(%d) recover failed reach max limit", dp.partitionID)
1296
dp.PersistMetadata()
1297
dp.StopDecommissionRecover(true)
1300
func (dp *DataPartition) incDiskErrCnt() {
1301
diskErrCnt := atomic.AddUint64(&dp.diskErrCnt, 1)
1302
log.LogWarnf("[incDiskErrCnt]: dp(%v) disk err count:%v", dp.partitionID, diskErrCnt)
1305
func (dp *DataPartition) getDiskErrCnt() uint64 {
1306
return atomic.LoadUint64(&dp.diskErrCnt)