1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
29
"github.com/cubefs/cubefs/depends/tiglabs/raft"
30
raftProto "github.com/cubefs/cubefs/depends/tiglabs/raft/proto"
31
"github.com/cubefs/cubefs/proto"
32
"github.com/cubefs/cubefs/repl"
33
"github.com/cubefs/cubefs/storage"
34
"github.com/cubefs/cubefs/util"
35
"github.com/cubefs/cubefs/util/errors"
36
"github.com/cubefs/cubefs/util/exporter"
37
"github.com/cubefs/cubefs/util/log"
40
var ErrForbiddenDataPartition = errors.New("the data partition is forbidden")
42
func (s *DataNode) getPacketTpLabels(p *repl.Packet) map[string]string {
43
labels := make(map[string]string)
44
labels[exporter.Vol] = ""
45
labels[exporter.Op] = ""
46
labels[exporter.PartId] = ""
47
labels[exporter.Disk] = ""
49
if part, ok := p.Object.(*DataPartition); ok {
50
labels[exporter.Vol] = part.volumeID
51
labels[exporter.Op] = p.GetOpMsg()
52
if exporter.EnablePid {
53
labels[exporter.PartId] = fmt.Sprintf("%d", part.partitionID)
54
labels[exporter.Disk] = part.path
61
func isColdVolExtentDelErr(p *repl.Packet) bool {
66
partition, ok := p.Object.(*DataPartition)
71
if proto.IsNormalDp(partition.partitionType) {
75
if p.ResultCode == proto.OpNotExistErr {
82
func (s *DataNode) OperatePacket(p *repl.Packet, c net.Conn) (err error) {
84
tpLabels map[string]string
85
tpObject *exporter.TimePointCount
87
log.LogDebugf("action[OperatePacket] %v, pack [%v]", p.GetOpMsg(), p)
88
shallDegrade := p.ShallDegrade()
91
tpObject = exporter.NewTPCnt(p.GetOpMsg())
92
tpLabels = s.getPacketTpLabels(p)
94
start := time.Now().UnixNano()
99
err = fmt.Errorf("op(%v) error(%v)", p.GetOpMsg(), string(p.Data[:resultSize]))
100
logContent := fmt.Sprintf("action[OperatePacket] %v.",
101
p.LogMessage(p.GetOpMsg(), c.RemoteAddr().String(), start, err))
102
if isColdVolExtentDelErr(p) {
103
log.LogInfof(logContent)
105
log.LogErrorf(logContent)
108
logContent := fmt.Sprintf("action[OperatePacket] %v.",
109
p.LogMessage(p.GetOpMsg(), c.RemoteAddr().String(), start, nil))
111
case proto.OpStreamRead, proto.OpRead, proto.OpExtentRepairRead, proto.OpStreamFollowerRead:
112
case proto.OpReadTinyDeleteRecord:
113
log.LogRead(logContent)
114
case proto.OpWrite, proto.OpRandomWrite,
115
proto.OpRandomWriteVer, proto.OpSyncRandomWriteVer,
116
proto.OpRandomWriteAppend, proto.OpSyncRandomWriteAppend,
117
proto.OpTryWriteAppend, proto.OpSyncTryWriteAppend,
118
proto.OpSyncRandomWrite, proto.OpSyncWrite, proto.OpMarkDelete, proto.OpSplitMarkDelete:
119
log.LogWrite(logContent)
121
log.LogInfo(logContent)
126
tpObject.SetWithLabels(err, tpLabels)
130
case proto.OpCreateExtent:
131
s.handlePacketToCreateExtent(p)
132
case proto.OpWrite, proto.OpSyncWrite:
133
s.handleWritePacket(p)
134
case proto.OpStreamRead:
135
s.handleStreamReadPacket(p, c, StreamRead)
136
case proto.OpStreamFollowerRead:
137
s.extentRepairReadPacket(p, c, StreamRead)
138
case proto.OpExtentRepairRead:
139
s.handleExtentRepairReadPacket(p, c, RepairRead)
140
case proto.OpTinyExtentRepairRead:
141
s.handleTinyExtentRepairReadPacket(p, c)
142
case proto.OpMarkDelete, proto.OpSplitMarkDelete:
143
s.handleMarkDeletePacket(p, c)
144
case proto.OpBatchDeleteExtent:
145
s.handleBatchMarkDeletePacket(p, c)
146
case proto.OpRandomWrite, proto.OpSyncRandomWrite,
147
proto.OpRandomWriteAppend, proto.OpSyncRandomWriteAppend,
148
proto.OpTryWriteAppend, proto.OpSyncTryWriteAppend,
149
proto.OpRandomWriteVer, proto.OpSyncRandomWriteVer:
150
s.handleRandomWritePacket(p)
151
case proto.OpNotifyReplicasToRepair:
152
s.handlePacketToNotifyExtentRepair(p)
153
case proto.OpGetAllWatermarks:
154
s.handlePacketToGetAllWatermarks(p)
155
case proto.OpCreateDataPartition:
156
s.handlePacketToCreateDataPartition(p)
157
case proto.OpLoadDataPartition:
158
s.handlePacketToLoadDataPartition(p)
159
case proto.OpDeleteDataPartition:
160
s.handlePacketToDeleteDataPartition(p)
161
case proto.OpDataNodeHeartbeat:
162
s.handleHeartbeatPacket(p)
163
case proto.OpGetAppliedId:
164
s.handlePacketToGetAppliedID(p)
165
case proto.OpDecommissionDataPartition:
166
s.handlePacketToDecommissionDataPartition(p)
167
case proto.OpAddDataPartitionRaftMember:
168
s.handlePacketToAddDataPartitionRaftMember(p)
169
case proto.OpRemoveDataPartitionRaftMember:
170
s.handlePacketToRemoveDataPartitionRaftMember(p)
171
case proto.OpDataPartitionTryToLeader:
172
s.handlePacketToDataPartitionTryToLeader(p)
173
case proto.OpGetPartitionSize:
174
s.handlePacketToGetPartitionSize(p)
175
case proto.OpGetMaxExtentIDAndPartitionSize:
176
s.handlePacketToGetMaxExtentIDAndPartitionSize(p)
177
case proto.OpReadTinyDeleteRecord:
178
s.handlePacketToReadTinyDeleteRecordFile(p, c)
179
case proto.OpBroadcastMinAppliedID:
180
s.handleBroadcastMinAppliedID(p)
181
case proto.OpVersionOperation:
182
s.handleUpdateVerPacket(p)
183
case proto.OpStopDataPartitionRepair:
184
s.handlePacketToStopDataPartitionRepair(p)
186
p.PackErrorBody(repl.ErrorUnknownOp.Error(), repl.ErrorUnknownOp.Error()+strconv.Itoa(int(p.Opcode)))
192
// Handle OpCreateExtent packet.
193
func (s *DataNode) handlePacketToCreateExtent(p *repl.Packet) {
197
p.PackErrorBody(ActionCreateExtent, err.Error())
202
partition := p.Object.(*DataPartition)
203
if partition.Available() <= 0 || !partition.disk.CanWrite() {
204
err = storage.NoSpaceError
206
} else if partition.disk.Status == proto.Unavailable {
207
err = storage.BrokenDiskError
211
// in case too many extents
212
if partition.GetExtentCount() >= storage.MaxExtentCount+10 {
213
err = storage.NoSpaceError
217
partition.disk.allocCheckLimit(proto.IopsWriteType, 1)
218
partition.disk.limitWrite.Run(0, func() {
219
err = partition.ExtentStore().Create(p.ExtentID)
223
// Handle OpCreateDataPartition packet.
224
func (s *DataNode) handlePacketToCreateDataPartition(p *repl.Packet) {
232
p.PackErrorBody(ActionCreateDataPartition, err.Error())
235
task := &proto.AdminTask{}
236
if err = json.Unmarshal(p.Data, task); err != nil {
237
err = fmt.Errorf("cannnot unmashal adminTask")
240
request := &proto.CreateDataPartitionRequest{}
241
if task.OpCode != proto.OpCreateDataPartition {
242
err = fmt.Errorf("from master Task(%v) failed,error unavali opcode(%v)", task.ToString(), task.OpCode)
246
bytes, err = json.Marshal(task.Request)
248
err = fmt.Errorf("from master Task(%v) cannot unmashal CreateDataPartition, err %s", task.ToString(), err.Error())
251
p.AddMesgLog(string(bytes))
252
if err = json.Unmarshal(bytes, request); err != nil {
253
err = fmt.Errorf("from master Task(%v) cannot unmashal CreateDataPartitionRequest struct, err(%s)", task.ToString(), err.Error())
256
p.PartitionID = request.PartitionId
257
if dp, err = s.space.CreatePartition(request); err != nil {
258
err = fmt.Errorf("from master Task(%v) cannot create Partition err(%v)", task.ToString(), err)
261
p.PacketOkWithBody([]byte(dp.Disk().Path))
264
func (s *DataNode) commitDelVersion(volumeID string, verSeq uint64) (err error) {
265
for _, partition := range s.space.partitions {
266
if partition.config.VolName != volumeID {
269
verListMgr := partition.volVersionInfoList
270
verListMgr.RWLock.Lock()
271
for i, ver := range verListMgr.VerList {
272
if i == len(verListMgr.VerList)-1 {
273
log.LogWarnf("action[commitDelVersion] dp[%v] seq %v, seqArray size %v newest ver %v",
274
partition.config.PartitionID, verSeq, len(verListMgr.VerList), ver.Ver)
277
if ver.Ver == verSeq {
278
log.LogInfof("action[commitDelVersion] updateVerList dp[%v] seq %v,seqArray size %v", partition.config.PartitionID, verSeq, len(verListMgr.VerList))
279
verListMgr.VerList = append(verListMgr.VerList[:i], verListMgr.VerList[i+1:]...)
283
verListMgr.RWLock.Unlock()
288
func (s *DataNode) commitCreateVersion(req *proto.MultiVersionOpRequest) (err error) {
289
log.LogInfof("action[commitCreateVersion] handle master version reqeust %v", req)
295
if value, ok = s.volUpdating.Load(req.VolumeID); !ok {
296
log.LogWarnf("action[commitCreateVersion] vol %v not found seq %v", req.VolumeID, req.VerSeq)
300
ver2Phase := value.(*verOp2Phase)
301
log.LogInfof("action[commitCreateVersion] try commit volume %v ver2Phase seq %v with req seq %v",
302
req.VolumeID, ver2Phase.verPrepare, req.VerSeq)
303
if req.VerSeq < ver2Phase.verSeq {
304
log.LogWarnf("action[commitCreateVersion] vol %v seq %v create less than loal %v", req.VolumeID, req.VerSeq, ver2Phase.verSeq)
307
if ver2Phase.step != proto.CreateVersionPrepare {
308
log.LogWarnf("action[commitCreateVersion] vol %v seq %v step not prepare", req.VolumeID, ver2Phase.step)
311
s.space.partitionMutex.RLock()
312
defer s.space.partitionMutex.RUnlock()
313
resultCh := make(chan error, len(s.space.partitions))
314
for _, partition := range s.space.partitions {
315
if partition.config.VolName != req.VolumeID {
318
if !partition.isRaftLeader {
322
go func(partition *DataPartition) {
324
log.LogInfof("action[commitCreateVersion] volume %v dp[%v] do HandleVersionOp verSeq[%v]",
325
partition.volumeID, partition.partitionID, partition.verSeq)
326
if err = partition.HandleVersionOp(req); err != nil {
327
log.LogErrorf("action[commitCreateVersion] volume %v dp[%v] do HandleVersionOp verSeq[%v] err %v",
328
partition.volumeID, partition.partitionID, partition.verSeq, err)
337
case err = <-resultCh:
343
log.LogInfof("action[commitCreateVersion] volume %v do HandleVersionOp verseq [%v] finished", req.VolumeID, req.VerSeq)
346
if req.Op == proto.DeleteVersion {
350
if req.Op == proto.CreateVersionPrepare {
351
log.LogInfof("action[commitCreateVersion] commit volume %v prepare seq %v with commit seq %v",
352
req.VolumeID, ver2Phase.verPrepare, req.VerSeq)
356
ver2Phase.verSeq = req.VerSeq
357
ver2Phase.step = proto.CreateVersionCommit
358
ver2Phase.status = proto.VersionWorkingFinished
359
log.LogInfof("action[commitCreateVersion] commit volume %v prepare seq %v with commit seq %v",
360
req.VolumeID, ver2Phase.verPrepare, req.VerSeq)
365
func (s *DataNode) prepareCreateVersion(req *proto.MultiVersionOpRequest) (err error, opAagin bool) {
366
var ver2Phase *verOp2Phase
367
if value, ok := s.volUpdating.Load(req.VolumeID); ok {
368
ver2Phase = value.(*verOp2Phase)
369
if req.VerSeq < ver2Phase.verSeq {
370
err = fmt.Errorf("seq %v create less than loal %v", req.VerSeq, ver2Phase.verSeq)
371
log.LogInfof("action[prepareCreateVersion] volume %v update to ver %v step %v", req.VolumeID, req.VerSeq, ver2Phase.step)
373
} else if req.VerSeq == ver2Phase.verPrepare {
374
if ver2Phase.step == proto.VersionWorking {
380
ver2Phase = &verOp2Phase{}
381
ver2Phase.step = uint32(req.Op)
382
ver2Phase.status = proto.VersionWorking
383
ver2Phase.verPrepare = req.VerSeq
385
s.volUpdating.Store(req.VolumeID, ver2Phase)
387
log.LogInfof("action[prepareCreateVersion] volume %v update seq to %v step %v",
388
req.VolumeID, req.VerSeq, ver2Phase.step)
392
// Handle OpHeartbeat packet.
393
func (s *DataNode) handleUpdateVerPacket(p *repl.Packet) {
397
p.PackErrorBody(ActionUpdateVersion, err.Error())
403
task := &proto.AdminTask{}
404
err = json.Unmarshal(p.Data, task)
406
log.LogErrorf("action[handleUpdateVerPacket] handle master version reqeust err %v", err)
409
request := &proto.MultiVersionOpRequest{}
410
response := &proto.MultiVersionOpResponse{}
411
response.Op = task.OpCode
412
response.Status = proto.TaskSucceeds
414
if task.OpCode == proto.OpVersionOperation {
415
marshaled, _ := json.Marshal(task.Request)
416
if err = json.Unmarshal(marshaled, request); err != nil {
417
log.LogErrorf("action[handleUpdateVerPacket] handle master version reqeust err %v", err)
418
response.Status = proto.TaskFailed
422
if request.Op == proto.CreateVersionPrepare {
423
if err, _ = s.prepareCreateVersion(request); err != nil {
424
log.LogErrorf("action[handleUpdateVerPacket] handle master version reqeust err %v", err)
427
if err = s.commitCreateVersion(request); err != nil {
428
log.LogErrorf("action[handleUpdateVerPacket] handle master version reqeust err %v", err)
431
} else if request.Op == proto.CreateVersionCommit {
432
if err = s.commitCreateVersion(request); err != nil {
433
log.LogErrorf("action[handleUpdateVerPacket] handle master version reqeust err %v", err)
436
} else if request.Op == proto.DeleteVersion {
437
if err = s.commitDelVersion(request.VolumeID, request.VerSeq); err != nil {
438
log.LogErrorf("action[handleUpdateVerPacket] handle master version reqeust err %v", err)
443
response.VerSeq = request.VerSeq
444
response.Op = request.Op
445
response.Addr = request.Addr
446
response.VolumeID = request.VolumeID
449
err = fmt.Errorf("illegal opcode")
450
log.LogErrorf("action[handleUpdateVerPacket] handle master version reqeust err %v", err)
455
response.Result = err.Error()
457
task.Response = response
458
log.LogInfof("action[handleUpdateVerPacket] rsp to client,req vol %v, verseq %v, op %v", request.VolumeID, request.VerSeq, request.Op)
459
if err = MasterClient.NodeAPI().ResponseDataNodeTask(task); err != nil {
460
err = errors.Trace(err, "handleUpdateVerPacket to master failed.")
461
log.LogErrorf(err.Error())
466
func (s *DataNode) checkVolumeForbidden(volNames []string) {
467
s.space.RangePartitions(func(partition *DataPartition) bool {
468
for _, volName := range volNames {
469
if volName == partition.volumeID {
470
partition.SetForbidden(true)
474
partition.SetForbidden(false)
479
func (s *DataNode) checkDecommissionDisks(decommissionDisks []string) {
480
decommissionDiskSet := util.NewSet()
481
for _, disk := range decommissionDisks {
482
decommissionDiskSet.Add(disk)
484
disks := s.space.GetDisks()
485
for _, disk := range disks {
486
if disk.GetDecommissionStatus() && !decommissionDiskSet.Has(disk.Path) {
487
log.LogDebugf("action[checkDecommissionDisks] mark %v to be undecommissioned", disk.Path)
488
disk.MarkDecommissionStatus(false)
491
if !disk.GetDecommissionStatus() && decommissionDiskSet.Has(disk.Path) {
492
log.LogDebugf("action[checkDecommissionDisks] mark %v to be decommissioned", disk.Path)
493
disk.MarkDecommissionStatus(true)
499
// Handle OpHeartbeat packet.
500
func (s *DataNode) handleHeartbeatPacket(p *repl.Packet) {
502
task := &proto.AdminTask{}
503
err = json.Unmarshal(p.Data, task)
506
p.PackErrorBody(ActionCreateDataPartition, err.Error())
516
request := &proto.HeartBeatRequest{}
517
response := &proto.DataNodeHeartbeatResponse{}
518
s.buildHeartBeatResponse(response)
520
if task.OpCode == proto.OpDataNodeHeartbeat {
521
marshaled, _ := json.Marshal(task.Request)
522
_ = json.Unmarshal(marshaled, request)
523
response.Status = proto.TaskSucceeds
524
if s.diskQosEnableFromMaster != request.EnableDiskQos {
525
log.LogWarnf("action[handleHeartbeatPacket] master command disk qos enable change to [%v], local conf enable [%v]",
526
request.EnableDiskQos,
530
// set volume forbidden
531
s.checkVolumeForbidden(request.ForbiddenVols)
532
// set decommission disks
533
s.checkDecommissionDisks(request.DecommissionDisks)
534
s.diskQosEnableFromMaster = request.EnableDiskQos
537
for _, pair := range []struct {
541
{request.QosFlowWriteLimit, &s.diskWriteFlow},
542
{request.QosFlowReadLimit, &s.diskReadFlow},
543
{request.QosIopsWriteLimit, &s.diskWriteIops},
544
{request.QosIopsReadLimit, &s.diskReadIops},
546
if pair.replace > 0 && int(pair.replace) != *pair.origin {
547
*pair.origin = int(pair.replace)
552
// set cpu util and io used in here
553
response.CpuUtil = s.cpuUtil.Load()
554
response.IoUtils = s.space.GetDiskUtils()
557
log.LogWarnf("action[handleHeartbeatPacket] master change disk qos limit to [flowWrite %v, flowRead %v, iopsWrite %v, iopsRead %v]",
558
s.diskWriteFlow, s.diskReadFlow, s.diskWriteIops, s.diskReadIops)
562
response.Status = proto.TaskFailed
563
err = fmt.Errorf("illegal opcode")
564
response.Result = err.Error()
566
task.Response = response
567
if err = MasterClient.NodeAPI().ResponseDataNodeTask(task); err != nil {
568
err = errors.Trace(err, "heartbeat to master(%v) failed.", request.MasterAddr)
569
log.LogErrorf(err.Error())
575
// Handle OpDeleteDataPartition packet.
576
func (s *DataNode) handlePacketToDeleteDataPartition(p *repl.Packet) {
577
task := &proto.AdminTask{}
578
err := json.Unmarshal(p.Data, task)
581
p.PackErrorBody(ActionDeleteDataPartition, err.Error())
589
request := &proto.DeleteDataPartitionRequest{}
590
if task.OpCode == proto.OpDeleteDataPartition {
591
bytes, _ := json.Marshal(task.Request)
592
p.AddMesgLog(string(bytes))
593
err = json.Unmarshal(bytes, request)
597
s.space.DeletePartition(request.PartitionId)
600
err = fmt.Errorf("illegal opcode ")
603
err = errors.Trace(err, "delete DataPartition failed,PartitionID(%v)", request.PartitionId)
604
log.LogErrorf("action[handlePacketToDeleteDataPartition] err(%v).", err)
606
log.LogInfof(fmt.Sprintf("action[handlePacketToDeleteDataPartition] %v error(%v)", request.PartitionId, err))
609
// Handle OpLoadDataPartition packet.
610
func (s *DataNode) handlePacketToLoadDataPartition(p *repl.Packet) {
611
task := &proto.AdminTask{}
615
p.PackErrorBody(ActionLoadDataPartition, err.Error())
620
err = json.Unmarshal(p.Data, task)
622
go s.asyncLoadDataPartition(task)
625
func (s *DataNode) asyncLoadDataPartition(task *proto.AdminTask) {
627
request := &proto.LoadDataPartitionRequest{}
628
response := &proto.LoadDataPartitionResponse{}
629
if task.OpCode == proto.OpLoadDataPartition {
630
bytes, _ := json.Marshal(task.Request)
631
json.Unmarshal(bytes, request)
632
dp := s.space.Partition(request.PartitionId)
634
response.Status = proto.TaskFailed
635
response.PartitionId = uint64(request.PartitionId)
636
err = fmt.Errorf(fmt.Sprintf("DataPartition(%v) not found", request.PartitionId))
637
response.Result = err.Error()
640
response.PartitionId = uint64(request.PartitionId)
641
response.Status = proto.TaskSucceeds
644
response.PartitionId = uint64(request.PartitionId)
645
response.Status = proto.TaskFailed
646
err = fmt.Errorf("illegal opcode")
647
response.Result = err.Error()
649
task.Response = response
650
if err = MasterClient.NodeAPI().ResponseDataNodeTask(task); err != nil {
651
err = errors.Trace(err, "load DataPartition failed,PartitionID(%v)", request.PartitionId)
652
log.LogError(errors.Stack(err))
656
// Handle OpMarkDelete packet.
657
func (s *DataNode) handleMarkDeletePacket(p *repl.Packet, c net.Conn) {
661
p.PackErrorBody(ActionBatchMarkDelete, err.Error())
666
partition := p.Object.(*DataPartition)
667
// NOTE: we cannot prevent mark delete
668
// even the partition is forbidden, because
669
// the inode already be deleted in meta partition
670
// if we prevent it, we will get "orphan extents"
671
if proto.IsTinyExtentType(p.ExtentType) || p.Opcode == proto.OpSplitMarkDelete {
672
ext := new(proto.TinyExtentDeleteRecord)
673
err = json.Unmarshal(p.Data, ext)
675
log.LogInfof("handleMarkDeletePacket Delete PartitionID(%v)_Extent(%v)_Offset(%v)_Size(%v)",
676
p.PartitionID, p.ExtentID, ext.ExtentOffset, ext.Size)
677
partition.disk.allocCheckLimit(proto.IopsWriteType, 1)
678
partition.disk.limitWrite.Run(0, func() {
679
err = partition.ExtentStore().MarkDelete(p.ExtentID, int64(ext.ExtentOffset), int64(ext.Size))
681
log.LogErrorf("action[handleMarkDeletePacket]: failed to mark delete extent(%v), %v", p.ExtentID, err)
686
log.LogInfof("handleMarkDeletePacket Delete PartitionID(%v)_Extent(%v)",
687
p.PartitionID, p.ExtentID)
688
partition.disk.allocCheckLimit(proto.IopsWriteType, 1)
689
partition.disk.limitWrite.Run(0, func() {
690
err = partition.ExtentStore().MarkDelete(p.ExtentID, 0, 0)
692
log.LogErrorf("action[handleMarkDeletePacket]: failed to mark delete extent(%v), %v", p.ExtentID, err)
698
// Handle OpMarkDelete packet.
699
func (s *DataNode) handleBatchMarkDeletePacket(p *repl.Packet, c net.Conn) {
703
log.LogErrorf(fmt.Sprintf("(%v) error(%v).", p.GetUniqueLogId(), err))
704
p.PackErrorBody(ActionBatchMarkDelete, err.Error())
709
partition := p.Object.(*DataPartition)
710
// NOTE: we cannot prevent mark delete
711
// even the partition is forbidden, because
712
// the inode already be deleted in meta partition
713
// if we prevent it, we will get "orphan extents"
714
var exts []*proto.ExtentKey
715
err = json.Unmarshal(p.Data, &exts)
716
store := partition.ExtentStore()
718
for _, ext := range exts {
719
if deleteLimiteRater.Allow() {
720
log.LogInfof(fmt.Sprintf("recive DeleteExtent (%v) from (%v)", ext, c.RemoteAddr().String()))
721
partition.disk.allocCheckLimit(proto.IopsWriteType, 1)
722
partition.disk.limitWrite.Run(0, func() {
723
err = store.MarkDelete(ext.ExtentId, int64(ext.ExtentOffset), int64(ext.Size))
725
log.LogErrorf("action[handleBatchMarkDeletePacket]: failed to mark delete extent(%v), %v", p.ExtentID, err)
732
log.LogInfof("delete limiter reach(%v), remote (%v) try again.", deleteLimiteRater.Limit(), c.RemoteAddr().String())
733
err = storage.TryAgainError
739
// Handle OpWrite packet.
740
func (s *DataNode) handleWritePacket(p *repl.Packet) {
743
metricPartitionIOLabels map[string]string
744
partitionIOMetric *exporter.TimePointCount
748
p.PackErrorBody(ActionWrite, err.Error())
753
partition := p.Object.(*DataPartition)
754
if partition.IsForbidden() {
755
err = ErrForbiddenDataPartition
758
shallDegrade := p.ShallDegrade()
760
metricPartitionIOLabels = GetIoMetricLabels(partition, "write")
762
if partition.Available() <= 0 || !partition.disk.CanWrite() {
763
err = storage.NoSpaceError
765
} else if partition.disk.Status == proto.Unavailable {
766
err = storage.BrokenDiskError
769
store := partition.ExtentStore()
770
if proto.IsTinyExtentType(p.ExtentType) {
772
partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
775
partition.disk.allocCheckLimit(proto.FlowWriteType, uint32(p.Size))
776
partition.disk.allocCheckLimit(proto.IopsWriteType, 1)
778
if writable := partition.disk.limitWrite.TryRun(int(p.Size), func() {
779
_, err = store.Write(p.ExtentID, p.ExtentOffset, int64(p.Size), p.Data, p.CRC, storage.AppendWriteType, p.IsSyncWrite())
781
err = storage.TryAgainError
785
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
786
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
788
partition.checkIsDiskError(err, WriteFlag)
792
if p.Size <= util.BlockSize {
794
partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
797
partition.disk.allocCheckLimit(proto.FlowWriteType, uint32(p.Size))
798
partition.disk.allocCheckLimit(proto.IopsWriteType, 1)
800
if writable := partition.disk.limitWrite.TryRun(int(p.Size), func() {
801
_, err = store.Write(p.ExtentID, p.ExtentOffset, int64(p.Size), p.Data, p.CRC, storage.AppendWriteType, p.IsSyncWrite())
803
err = storage.TryAgainError
807
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
808
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
810
partition.checkIsDiskError(err, WriteFlag)
818
currSize := util.Min(int(size), util.BlockSize)
819
data := p.Data[offset : offset+currSize]
820
crc := crc32.ChecksumIEEE(data)
822
partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
825
partition.disk.allocCheckLimit(proto.FlowWriteType, uint32(currSize))
826
partition.disk.allocCheckLimit(proto.IopsWriteType, 1)
828
if writable := partition.disk.limitWrite.TryRun(currSize, func() {
829
_, err = store.Write(p.ExtentID, p.ExtentOffset+int64(offset), int64(currSize), data, crc, storage.AppendWriteType, p.IsSyncWrite())
831
err = storage.TryAgainError
835
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
836
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
838
partition.checkIsDiskError(err, WriteFlag)
842
size -= uint32(currSize)
848
func (s *DataNode) handleRandomWritePacket(p *repl.Packet) {
852
metricPartitionIOLabels map[string]string
853
partitionIOMetric *exporter.TimePointCount
857
log.LogDebugf("action[handleRandomWritePacket opcod %v seq %v dpid %v resultCode %v extid %v err %v",
858
p.Opcode, p.VerSeq, p.PartitionID, p.ResultCode, p.ExtentID, err)
860
p.PackErrorBody(ActionWrite, err.Error())
862
// avoid rsp pack ver info into package which client need do more work to read buffer
863
if p.Opcode == proto.OpRandomWriteVer || p.Opcode == proto.OpSyncRandomWriteVer {
864
p.Opcode = proto.OpSyncRandomWriteVerRsp
866
if p.Opcode == proto.OpTryWriteAppend && p.ResultCode == proto.OpTryOtherExtent {
867
p.PackErrorBody(ActionWrite, storage.SnapshotNeedNewExtentError.Error())
868
p.ResultCode = proto.OpTryOtherExtent
869
log.LogDebugf("action[handleRandomWritePacket opcod %v seq %v dpid %v resultCode %v extid %v", p.Opcode, p.VerSeq, p.PartitionID, p.ResultCode, p.ExtentID)
876
partition := p.Object.(*DataPartition)
877
if partition.IsForbidden() {
878
err = ErrForbiddenDataPartition
881
log.LogDebugf("action[handleRandomWritePacket opcod %v seq %v dpid %v dpseq %v extid %v", p.Opcode, p.VerSeq, p.PartitionID, partition.verSeq, p.ExtentID)
882
// cache or preload partition not support raft and repair.
883
if !partition.isNormalType() {
884
err = raft.ErrStopped
888
_, isLeader := partition.IsRaftLeader()
890
err = raft.ErrNotLeader
893
shallDegrade := p.ShallDegrade()
895
metricPartitionIOLabels = GetIoMetricLabels(partition, "randwrite")
896
partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
899
err = partition.RandomWriteSubmit(p)
901
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
902
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
905
if err != nil && strings.Contains(err.Error(), raft.ErrNotLeader.Error()) {
906
err = raft.ErrNotLeader
907
log.LogErrorf("action[handleRandomWritePacket] opcod %v seq %v dpid %v dpseq %v extid %v err %v", p.Opcode, p.VerSeq, p.PartitionID, partition.verSeq, p.ExtentID, err)
911
if err == nil && p.ResultCode != proto.OpOk && p.ResultCode != proto.OpTryOtherExtent {
912
log.LogErrorf("action[handleRandomWritePacket] opcod %v seq %v dpid %v dpseq %v extid %v ResultCode %v",
913
p.Opcode, p.VerSeq, p.PartitionID, partition.verSeq, p.ExtentID, p.ResultCode)
914
err = storage.TryAgainError
917
log.LogDebugf("action[handleRandomWritePacket] opcod %v seq %v dpid %v dpseq %v after raft submit err %v resultCode %v",
918
p.Opcode, p.VerSeq, p.PartitionID, partition.verSeq, err, p.ResultCode)
921
func (s *DataNode) handleStreamReadPacket(p *repl.Packet, connect net.Conn, isRepairRead bool) {
925
p.PackErrorBody(ActionStreamRead, err.Error())
926
p.WriteToConn(connect)
929
partition := p.Object.(*DataPartition)
931
// cache or preload partition not support raft and repair.
932
if !partition.isNormalType() {
933
err = raft.ErrStopped
937
if err = partition.CheckLeader(p, connect); err != nil {
940
s.extentRepairReadPacket(p, connect, isRepairRead)
943
func (s *DataNode) handleExtentRepairReadPacket(p *repl.Packet, connect net.Conn, isRepairRead bool) {
945
log.LogDebugf("handleExtentRepairReadPacket %v", p)
948
p.PackErrorBody(ActionStreamRead, err.Error())
949
p.WriteToConn(connect)
952
fininshDoExtentRepair()
955
err = requestDoExtentRepair()
960
s.extentRepairReadPacket(p, connect, isRepairRead)
963
func (s *DataNode) handleTinyExtentRepairReadPacket(p *repl.Packet, connect net.Conn) {
964
s.tinyExtentRepairRead(p, connect)
967
func (s *DataNode) extentRepairReadPacket(p *repl.Packet, connect net.Conn, isRepairRead bool) {
971
metricPartitionIOLabels map[string]string
972
partitionIOMetric, tpObject *exporter.TimePointCount
976
p.PackErrorBody(ActionStreamRead, err.Error())
977
p.WriteToConn(connect)
980
partition := p.Object.(*DataPartition)
981
needReplySize := p.Size
982
offset := p.ExtentOffset
983
store := partition.ExtentStore()
984
shallDegrade := p.ShallDegrade()
986
metricPartitionIOLabels = GetIoMetricLabels(partition, "read")
988
log.LogDebugf("extentRepairReadPacket dp %v offset %v needSize %v", partition.partitionID, offset, needReplySize)
990
if needReplySize <= 0 {
994
reply := repl.NewStreamReadResponsePacket(p.ReqID, p.PartitionID, p.ExtentID)
995
reply.StartT = p.StartT
996
currReadSize := uint32(util.Min(int(needReplySize), util.ReadBlockSize))
997
if currReadSize == util.ReadBlockSize {
998
reply.Data, _ = proto.Buffers.Get(util.ReadBlockSize)
1000
reply.Data = make([]byte, currReadSize)
1003
partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
1004
tpObject = exporter.NewTPCnt(fmt.Sprintf("Repair_%s", p.GetOpMsg()))
1006
reply.ExtentOffset = offset
1007
p.Size = currReadSize
1008
p.ExtentOffset = offset
1010
partition.Disk().allocCheckLimit(proto.IopsReadType, 1)
1011
partition.Disk().allocCheckLimit(proto.FlowReadType, currReadSize)
1013
partition.disk.limitRead.Run(int(currReadSize), func() {
1014
reply.CRC, err = store.Read(reply.ExtentID, offset, int64(currReadSize), reply.Data, isRepairRead)
1017
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
1018
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
1021
partition.checkIsDiskError(err, ReadFlag)
1024
log.LogErrorf("action[operatePacket] err %v", err)
1027
reply.Size = currReadSize
1028
reply.ResultCode = proto.OpOk
1029
reply.Opcode = p.Opcode
1030
p.ResultCode = proto.OpOk
1031
if err = reply.WriteToConn(connect); err != nil {
1034
needReplySize -= currReadSize
1035
offset += int64(currReadSize)
1036
if currReadSize == util.ReadBlockSize {
1037
proto.Buffers.Put(reply.Data)
1039
logContent := fmt.Sprintf("action[operatePacket] %v.",
1040
reply.LogMessage(reply.GetOpMsg(), connect.RemoteAddr().String(), reply.StartT, err))
1041
log.LogReadf(logContent)
1046
func (s *DataNode) handlePacketToGetAllWatermarks(p *repl.Packet) {
1049
fInfoList []*storage.ExtentInfo
1052
partition := p.Object.(*DataPartition)
1053
store := partition.ExtentStore()
1054
if proto.IsNormalExtentType(p.ExtentType) {
1055
fInfoList, _, err = store.GetAllWatermarks(storage.NormalExtentFilter())
1057
extents := make([]uint64, 0)
1058
err = json.Unmarshal(p.Data, &extents)
1060
fInfoList, _, err = store.GetAllWatermarks(storage.TinyExtentFilter(extents))
1064
p.PackErrorBody(ActionGetAllExtentWatermarks, err.Error())
1066
buf, err = json.Marshal(fInfoList)
1068
p.PackErrorBody(ActionGetAllExtentWatermarks, err.Error())
1070
p.PacketOkWithByte(buf)
1075
func (s *DataNode) writeEmptyPacketOnTinyExtentRepairRead(reply *repl.Packet, newOffset, currentOffset int64, connect net.Conn) (replySize int64, err error) {
1076
replySize = newOffset - currentOffset
1077
reply.Data = make([]byte, 0)
1079
reply.CRC = crc32.ChecksumIEEE(reply.Data)
1080
reply.ResultCode = proto.OpOk
1081
reply.ExtentOffset = currentOffset
1082
reply.Arg[0] = EmptyResponse
1083
binary.BigEndian.PutUint64(reply.Arg[1:9], uint64(replySize))
1084
err = reply.WriteToConn(connect)
1085
reply.Size = uint32(replySize)
1086
logContent := fmt.Sprintf("action[operatePacket] %v.",
1087
reply.LogMessage(reply.GetOpMsg(), connect.RemoteAddr().String(), reply.StartT, err))
1088
log.LogReadf(logContent)
1093
func (s *DataNode) attachAvaliSizeOnTinyExtentRepairRead(reply *repl.Packet, avaliSize uint64) {
1094
binary.BigEndian.PutUint64(reply.Arg[9:17], avaliSize)
1097
// Handle tinyExtentRepairRead packet.
1098
func (s *DataNode) tinyExtentRepairRead(request *repl.Packet, connect net.Conn) {
1102
tinyExtentFinfoSize uint64
1107
request.PackErrorBody(ActionStreamReadTinyExtentRepair, err.Error())
1108
request.WriteToConn(connect)
1111
if !storage.IsTinyExtent(request.ExtentID) {
1112
err = fmt.Errorf("unavali extentID (%v)", request.ExtentID)
1116
partition := request.Object.(*DataPartition)
1117
store := partition.ExtentStore()
1118
tinyExtentFinfoSize, err = store.TinyExtentGetFinfoSize(request.ExtentID)
1122
needReplySize = int64(request.Size)
1123
offset := request.ExtentOffset
1124
if uint64(request.ExtentOffset)+uint64(request.Size) > tinyExtentFinfoSize {
1125
needReplySize = int64(tinyExtentFinfoSize - uint64(request.ExtentOffset))
1127
avaliReplySize := uint64(needReplySize)
1129
var newOffset, newEnd int64
1131
if needReplySize <= 0 {
1134
reply := repl.NewTinyExtentStreamReadResponsePacket(request.ReqID, request.PartitionID, request.ExtentID)
1135
reply.ArgLen = TinyExtentRepairReadResponseArgLen
1136
reply.Arg = make([]byte, TinyExtentRepairReadResponseArgLen)
1137
s.attachAvaliSizeOnTinyExtentRepairRead(reply, avaliReplySize)
1138
newOffset, newEnd, err = store.TinyExtentAvaliOffset(request.ExtentID, offset)
1142
if newOffset > offset {
1144
if replySize, err = s.writeEmptyPacketOnTinyExtentRepairRead(reply, newOffset, offset, connect); err != nil {
1147
needReplySize -= replySize
1151
currNeedReplySize := newEnd - newOffset
1152
currReadSize := uint32(util.Min(int(currNeedReplySize), util.ReadBlockSize))
1153
if currReadSize == util.ReadBlockSize {
1154
reply.Data, _ = proto.Buffers.Get(util.ReadBlockSize)
1156
reply.Data = make([]byte, currReadSize)
1158
reply.ExtentOffset = offset
1159
reply.CRC, err = store.Read(reply.ExtentID, offset, int64(currReadSize), reply.Data, false)
1163
reply.Size = uint32(currReadSize)
1164
reply.ResultCode = proto.OpOk
1165
if err = reply.WriteToConn(connect); err != nil {
1169
needReplySize -= int64(currReadSize)
1170
offset += int64(currReadSize)
1171
if currReadSize == util.ReadBlockSize {
1172
proto.Buffers.Put(reply.Data)
1174
logContent := fmt.Sprintf("action[operatePacket] %v.",
1175
reply.LogMessage(reply.GetOpMsg(), connect.RemoteAddr().String(), reply.StartT, err))
1176
log.LogReadf(logContent)
1179
request.PacketOkReply()
1182
func (s *DataNode) handlePacketToReadTinyDeleteRecordFile(p *repl.Packet, connect net.Conn) {
1186
p.PackErrorBody(ActionStreamReadTinyDeleteRecord, err.Error())
1187
p.WriteToConn(connect)
1190
partition := p.Object.(*DataPartition)
1191
store := partition.ExtentStore()
1192
localTinyDeleteFileSize, err := store.LoadTinyDeleteFileOffset()
1196
needReplySize := localTinyDeleteFileSize - p.ExtentOffset
1197
offset := p.ExtentOffset
1198
reply := repl.NewReadTinyDeleteRecordResponsePacket(p.ReqID, p.PartitionID)
1199
reply.StartT = time.Now().UnixNano()
1201
if needReplySize <= 0 {
1205
currReadSize := uint32(util.Min(int(needReplySize), MaxSyncTinyDeleteBufferSize))
1206
reply.Data = make([]byte, currReadSize)
1207
reply.ExtentOffset = offset
1208
reply.CRC, err = store.ReadTinyDeleteRecords(offset, int64(currReadSize), reply.Data)
1210
err = fmt.Errorf(ActionStreamReadTinyDeleteRecord+" localTinyDeleteRecordSize(%v) offset(%v)"+
1211
" currReadSize(%v) err(%v)", localTinyDeleteFileSize, offset, currReadSize, err)
1214
reply.Size = uint32(currReadSize)
1215
reply.ResultCode = proto.OpOk
1216
if err = reply.WriteToConn(connect); err != nil {
1219
needReplySize -= int64(currReadSize)
1220
offset += int64(currReadSize)
1225
// Handle OpNotifyReplicasToRepair packet.
1226
func (s *DataNode) handlePacketToNotifyExtentRepair(p *repl.Packet) {
1228
partition := p.Object.(*DataPartition)
1229
mf := new(DataPartitionRepairTask)
1230
err = json.Unmarshal(p.Data, mf)
1232
p.PackErrorBody(ActionRepair, err.Error())
1235
partition.DoExtentStoreRepair(mf)
1239
// Handle OpBroadcastMinAppliedID
1240
func (s *DataNode) handleBroadcastMinAppliedID(p *repl.Packet) {
1241
partition := p.Object.(*DataPartition)
1242
minAppliedID := binary.BigEndian.Uint64(p.Data)
1243
if minAppliedID > 0 {
1244
partition.SetMinAppliedID(minAppliedID)
1246
log.LogDebugf("[handleBroadcastMinAppliedID] partition(%v) minAppliedID(%v)", partition.partitionID, minAppliedID)
1250
// Handle handlePacketToGetAppliedID packet.
1251
func (s *DataNode) handlePacketToGetAppliedID(p *repl.Packet) {
1252
partition := p.Object.(*DataPartition)
1253
appliedID := partition.GetAppliedID()
1254
buf := make([]byte, 8)
1255
binary.BigEndian.PutUint64(buf, appliedID)
1256
p.PacketOkWithBody(buf)
1257
p.AddMesgLog(fmt.Sprintf("_AppliedID(%v)", appliedID))
1260
func (s *DataNode) handlePacketToGetPartitionSize(p *repl.Packet) {
1261
partition := p.Object.(*DataPartition)
1262
usedSize := partition.extentStore.StoreSizeExtentID(p.ExtentID)
1263
buf := make([]byte, 8)
1264
binary.BigEndian.PutUint64(buf, uint64(usedSize))
1265
p.AddMesgLog(fmt.Sprintf("partitionSize_(%v)", usedSize))
1266
p.PacketOkWithBody(buf)
1269
func (s *DataNode) handlePacketToGetMaxExtentIDAndPartitionSize(p *repl.Packet) {
1270
partition := p.Object.(*DataPartition)
1271
maxExtentID, totalPartitionSize := partition.extentStore.GetMaxExtentIDAndPartitionSize()
1273
buf := make([]byte, 16)
1274
binary.BigEndian.PutUint64(buf[0:8], uint64(maxExtentID))
1275
binary.BigEndian.PutUint64(buf[8:16], totalPartitionSize)
1276
p.PacketOkWithBody(buf)
1279
func (s *DataNode) handlePacketToDecommissionDataPartition(p *repl.Packet) {
1284
req = &proto.DataPartitionDecommissionRequest{}
1289
p.PackErrorBody(ActionDecommissionPartition, err.Error())
1295
adminTask := &proto.AdminTask{}
1296
decode := json.NewDecoder(bytes.NewBuffer(p.Data))
1298
if err = decode.Decode(adminTask); err != nil {
1302
reqData, err = json.Marshal(adminTask.Request)
1306
if err = json.Unmarshal(reqData, req); err != nil {
1309
p.AddMesgLog(string(reqData))
1310
dp := s.space.Partition(req.PartitionId)
1312
err = fmt.Errorf("partition %v not exsit", req.PartitionId)
1315
p.PartitionID = req.PartitionId
1317
isRaftLeader, err = s.forwardToRaftLeader(dp, p, false)
1319
err = raft.ErrNotLeader
1322
if req.AddPeer.ID == req.RemovePeer.ID {
1323
err = errors.NewErrorf("[opOfflineDataPartition]: AddPeer(%v) same withRemovePeer(%v)", req.AddPeer, req.RemovePeer)
1326
if req.AddPeer.ID != 0 {
1327
_, err = dp.ChangeRaftMember(raftProto.ConfAddNode, raftProto.Peer{ID: req.AddPeer.ID}, reqData)
1332
_, err = dp.ChangeRaftMember(raftProto.ConfRemoveNode, raftProto.Peer{ID: req.RemovePeer.ID}, reqData)
1338
func (s *DataNode) handlePacketToAddDataPartitionRaftMember(p *repl.Packet) {
1343
req = &proto.AddDataPartitionRaftMemberRequest{}
1348
p.PackErrorBody(ActionAddDataPartitionRaftMember, err.Error())
1354
adminTask := &proto.AdminTask{}
1355
decode := json.NewDecoder(bytes.NewBuffer(p.Data))
1357
if err = decode.Decode(adminTask); err != nil {
1361
reqData, err = json.Marshal(adminTask.Request)
1365
if err = json.Unmarshal(reqData, req); err != nil {
1369
log.LogInfof("action[handlePacketToAddDataPartitionRaftMember] %v, partition id %v", req.AddPeer, req.PartitionId)
1371
p.AddMesgLog(string(reqData))
1372
dp := s.space.Partition(req.PartitionId)
1374
err = proto.ErrDataPartitionNotExists
1377
p.PartitionID = req.PartitionId
1378
if dp.IsExistReplica(req.AddPeer.Addr) {
1379
log.LogInfof("handlePacketToAddDataPartitionRaftMember recive MasterCommand: %v "+
1380
"addRaftAddr(%v) has exsit", string(reqData), req.AddPeer.Addr)
1383
isRaftLeader, err = s.forwardToRaftLeader(dp, p, false)
1387
log.LogInfof("action[handlePacketToAddDataPartitionRaftMember] before ChangeRaftMember %v which is sync. partition id %v", req.AddPeer, req.PartitionId)
1389
if req.AddPeer.ID != 0 {
1390
_, err = dp.ChangeRaftMember(raftProto.ConfAddNode, raftProto.Peer{ID: req.AddPeer.ID}, reqData)
1395
log.LogInfof("action[handlePacketToAddDataPartitionRaftMember] after ChangeRaftMember %v, partition id %v", req.AddPeer, &req.PartitionId)
1398
func (s *DataNode) handlePacketToRemoveDataPartitionRaftMember(p *repl.Packet) {
1403
req = &proto.RemoveDataPartitionRaftMemberRequest{}
1408
p.PackErrorBody(ActionRemoveDataPartitionRaftMember, err.Error())
1414
adminTask := &proto.AdminTask{}
1415
decode := json.NewDecoder(bytes.NewBuffer(p.Data))
1417
if err = decode.Decode(adminTask); err != nil {
1421
reqData, err = json.Marshal(adminTask.Request)
1422
p.AddMesgLog(string(reqData))
1426
if err = json.Unmarshal(reqData, req); err != nil {
1430
dp := s.space.Partition(req.PartitionId)
1435
log.LogDebugf("action[handlePacketToRemoveDataPartitionRaftMember], req %v (%s) RemoveRaftPeer(%s) dp %v replicaNum %v",
1436
p.GetReqID(), string(reqData), req.RemovePeer.Addr, dp.partitionID, dp.replicaNum)
1438
p.PartitionID = req.PartitionId
1440
if !dp.IsExistReplica(req.RemovePeer.Addr) {
1441
log.LogWarnf("action[handlePacketToRemoveDataPartitionRaftMember] receive MasterCommand: req %v[%v] "+
1442
"RemoveRaftPeer(%v) has not exist", p.GetReqID(), string(reqData), req.RemovePeer.Addr)
1446
isRaftLeader, err = s.forwardToRaftLeader(dp, p, req.Force)
1448
log.LogWarnf("handlePacketToRemoveDataPartitionRaftMember return no leader")
1451
if err = dp.CanRemoveRaftMember(req.RemovePeer, req.Force); err != nil {
1452
log.LogWarnf("action[handlePacketToRemoveDataPartitionRaftMember] CanRemoveRaftMember failed "+
1453
"req %v dp %v err %v",
1454
p.GetReqID(), dp.partitionID, err.Error())
1459
cc := &raftProto.ConfChange{
1460
Type: raftProto.ConfRemoveNode,
1461
Peer: raftProto.Peer{
1462
ID: req.RemovePeer.ID,
1466
s.raftStore.RaftServer().RemoveRaftForce(dp.partitionID, cc)
1467
dp.ApplyMemberChange(cc, 0)
1468
dp.PersistMetadata()
1472
if req.RemovePeer.ID != 0 {
1473
log.LogDebugf("action[handlePacketToRemoveDataPartitionRaftMember] ChangeRaftMember "+
1474
"req %v dp %v RemovePeer.ID %v", p.GetReqID(), dp.partitionID, req.RemovePeer.ID)
1475
_, err = dp.ChangeRaftMember(raftProto.ConfRemoveNode, raftProto.Peer{ID: req.RemovePeer.ID}, reqData)
1480
log.LogDebugf("action[handlePacketToRemoveDataPartitionRaftMember] CanRemoveRaftMember complete "+
1481
"req %v dp %v ", p.GetReqID(), dp.partitionID)
1484
func (s *DataNode) handlePacketToDataPartitionTryToLeader(p *repl.Packet) {
1489
p.PackErrorBody(ActionDataPartitionTryToLeader, err.Error())
1490
log.LogWarnf("handlePacketToDataPartitionTryToLeader: %v ", err.Error())
1493
log.LogDebugf("handlePacketToDataPartitionTryToLeader: partition %v success ", p.PartitionID)
1496
log.LogDebugf("handlePacketToDataPartitionTryToLeader: partition %v ", p.PartitionID)
1497
dp := s.space.Partition(p.PartitionID)
1499
err = fmt.Errorf("partition %v not exsit", p.PartitionID)
1503
if dp.raftStatus != RaftStatusRunning {
1504
err = fmt.Errorf("partition %v raft not running", p.PartitionID)
1508
if dp.raftPartition.IsRaftLeader() {
1509
log.LogWarnf("handlePacketToDataPartitionTryToLeader: %v is already leader", p.PartitionID)
1512
err = dp.raftPartition.TryToLeader(dp.partitionID)
1515
func (s *DataNode) forwardToRaftLeader(dp *DataPartition, p *repl.Packet, force bool) (ok bool, err error) {
1521
if leaderAddr, ok = dp.IsRaftLeader(); ok {
1524
// return NoLeaderError if leaderAddr is nil
1525
if leaderAddr == "" {
1528
log.LogInfof("action[forwardToRaftLeader] no leader but replica num %v continue", dp.replicaNum)
1531
err = storage.NoLeaderError
1535
// forward the packet to the leader if local one is not the leader
1536
conn, err = gConnPool.GetConnect(leaderAddr)
1541
gConnPool.PutConnect(conn, err != nil)
1543
err = p.WriteToConn(conn)
1547
if err = p.ReadFromConnWithVer(conn, proto.NoReadDeadlineTime); err != nil {
1554
func (s *DataNode) handlePacketToStopDataPartitionRepair(p *repl.Packet) {
1555
task := &proto.AdminTask{}
1556
err := json.Unmarshal(p.Data, task)
1559
p.PackErrorBody(ActionStopDataPartitionRepair, err.Error())
1567
request := &proto.StopDataPartitionRepairRequest{}
1568
if task.OpCode != proto.OpStopDataPartitionRepair {
1569
err = fmt.Errorf("action[handlePacketToStopDataPartitionRepair] illegal opcode ")
1570
log.LogWarnf("action[handlePacketToStopDataPartitionRepair] illegal opcode ")
1574
bytes, _ := json.Marshal(task.Request)
1575
p.AddMesgLog(string(bytes))
1576
err = json.Unmarshal(bytes, request)
1580
log.LogDebugf("action[handlePacketToStopDataPartitionRepair] try stop %v", request.PartitionId)
1581
dp := s.space.Partition(request.PartitionId)
1583
err = proto.ErrDataPartitionNotExists
1584
log.LogWarnf("action[handlePacketToStopDataPartitionRepair] cannot find dp %v", request.PartitionId)
1587
dp.StopDecommissionRecover(request.Stop)
1588
log.LogInfof("action[handlePacketToStopDataPartitionRepair] %v stop %v success", request.PartitionId, request.Stop)