cubefs

Форк
0
/
wrap_operator.go 
1589 строк · 48.0 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package datanode
16

17
import (
18
	"bytes"
19
	"encoding/binary"
20
	"encoding/json"
21
	"fmt"
22
	"hash/crc32"
23
	"net"
24
	"strconv"
25
	"strings"
26
	"sync"
27
	"time"
28

29
	"github.com/cubefs/cubefs/depends/tiglabs/raft"
30
	raftProto "github.com/cubefs/cubefs/depends/tiglabs/raft/proto"
31
	"github.com/cubefs/cubefs/proto"
32
	"github.com/cubefs/cubefs/repl"
33
	"github.com/cubefs/cubefs/storage"
34
	"github.com/cubefs/cubefs/util"
35
	"github.com/cubefs/cubefs/util/errors"
36
	"github.com/cubefs/cubefs/util/exporter"
37
	"github.com/cubefs/cubefs/util/log"
38
)
39

40
var ErrForbiddenDataPartition = errors.New("the data partition is forbidden")
41

42
func (s *DataNode) getPacketTpLabels(p *repl.Packet) map[string]string {
43
	labels := make(map[string]string)
44
	labels[exporter.Vol] = ""
45
	labels[exporter.Op] = ""
46
	labels[exporter.PartId] = ""
47
	labels[exporter.Disk] = ""
48

49
	if part, ok := p.Object.(*DataPartition); ok {
50
		labels[exporter.Vol] = part.volumeID
51
		labels[exporter.Op] = p.GetOpMsg()
52
		if exporter.EnablePid {
53
			labels[exporter.PartId] = fmt.Sprintf("%d", part.partitionID)
54
			labels[exporter.Disk] = part.path
55
		}
56
	}
57

58
	return labels
59
}
60

61
func isColdVolExtentDelErr(p *repl.Packet) bool {
62
	if p.Object == nil {
63
		return false
64
	}
65

66
	partition, ok := p.Object.(*DataPartition)
67
	if !ok {
68
		return false
69
	}
70

71
	if proto.IsNormalDp(partition.partitionType) {
72
		return false
73
	}
74

75
	if p.ResultCode == proto.OpNotExistErr {
76
		return true
77
	}
78

79
	return false
80
}
81

82
func (s *DataNode) OperatePacket(p *repl.Packet, c net.Conn) (err error) {
83
	var (
84
		tpLabels map[string]string
85
		tpObject *exporter.TimePointCount
86
	)
87
	log.LogDebugf("action[OperatePacket] %v, pack [%v]", p.GetOpMsg(), p)
88
	shallDegrade := p.ShallDegrade()
89
	sz := p.Size
90
	if !shallDegrade {
91
		tpObject = exporter.NewTPCnt(p.GetOpMsg())
92
		tpLabels = s.getPacketTpLabels(p)
93
	}
94
	start := time.Now().UnixNano()
95
	defer func() {
96
		resultSize := p.Size
97
		p.Size = sz
98
		if p.IsErrPacket() {
99
			err = fmt.Errorf("op(%v) error(%v)", p.GetOpMsg(), string(p.Data[:resultSize]))
100
			logContent := fmt.Sprintf("action[OperatePacket] %v.",
101
				p.LogMessage(p.GetOpMsg(), c.RemoteAddr().String(), start, err))
102
			if isColdVolExtentDelErr(p) {
103
				log.LogInfof(logContent)
104
			} else {
105
				log.LogErrorf(logContent)
106
			}
107
		} else {
108
			logContent := fmt.Sprintf("action[OperatePacket] %v.",
109
				p.LogMessage(p.GetOpMsg(), c.RemoteAddr().String(), start, nil))
110
			switch p.Opcode {
111
			case proto.OpStreamRead, proto.OpRead, proto.OpExtentRepairRead, proto.OpStreamFollowerRead:
112
			case proto.OpReadTinyDeleteRecord:
113
				log.LogRead(logContent)
114
			case proto.OpWrite, proto.OpRandomWrite,
115
				proto.OpRandomWriteVer, proto.OpSyncRandomWriteVer,
116
				proto.OpRandomWriteAppend, proto.OpSyncRandomWriteAppend,
117
				proto.OpTryWriteAppend, proto.OpSyncTryWriteAppend,
118
				proto.OpSyncRandomWrite, proto.OpSyncWrite, proto.OpMarkDelete, proto.OpSplitMarkDelete:
119
				log.LogWrite(logContent)
120
			default:
121
				log.LogInfo(logContent)
122
			}
123
		}
124
		p.Size = resultSize
125
		if !shallDegrade {
126
			tpObject.SetWithLabels(err, tpLabels)
127
		}
128
	}()
129
	switch p.Opcode {
130
	case proto.OpCreateExtent:
131
		s.handlePacketToCreateExtent(p)
132
	case proto.OpWrite, proto.OpSyncWrite:
133
		s.handleWritePacket(p)
134
	case proto.OpStreamRead:
135
		s.handleStreamReadPacket(p, c, StreamRead)
136
	case proto.OpStreamFollowerRead:
137
		s.extentRepairReadPacket(p, c, StreamRead)
138
	case proto.OpExtentRepairRead:
139
		s.handleExtentRepairReadPacket(p, c, RepairRead)
140
	case proto.OpTinyExtentRepairRead:
141
		s.handleTinyExtentRepairReadPacket(p, c)
142
	case proto.OpMarkDelete, proto.OpSplitMarkDelete:
143
		s.handleMarkDeletePacket(p, c)
144
	case proto.OpBatchDeleteExtent:
145
		s.handleBatchMarkDeletePacket(p, c)
146
	case proto.OpRandomWrite, proto.OpSyncRandomWrite,
147
		proto.OpRandomWriteAppend, proto.OpSyncRandomWriteAppend,
148
		proto.OpTryWriteAppend, proto.OpSyncTryWriteAppend,
149
		proto.OpRandomWriteVer, proto.OpSyncRandomWriteVer:
150
		s.handleRandomWritePacket(p)
151
	case proto.OpNotifyReplicasToRepair:
152
		s.handlePacketToNotifyExtentRepair(p)
153
	case proto.OpGetAllWatermarks:
154
		s.handlePacketToGetAllWatermarks(p)
155
	case proto.OpCreateDataPartition:
156
		s.handlePacketToCreateDataPartition(p)
157
	case proto.OpLoadDataPartition:
158
		s.handlePacketToLoadDataPartition(p)
159
	case proto.OpDeleteDataPartition:
160
		s.handlePacketToDeleteDataPartition(p)
161
	case proto.OpDataNodeHeartbeat:
162
		s.handleHeartbeatPacket(p)
163
	case proto.OpGetAppliedId:
164
		s.handlePacketToGetAppliedID(p)
165
	case proto.OpDecommissionDataPartition:
166
		s.handlePacketToDecommissionDataPartition(p)
167
	case proto.OpAddDataPartitionRaftMember:
168
		s.handlePacketToAddDataPartitionRaftMember(p)
169
	case proto.OpRemoveDataPartitionRaftMember:
170
		s.handlePacketToRemoveDataPartitionRaftMember(p)
171
	case proto.OpDataPartitionTryToLeader:
172
		s.handlePacketToDataPartitionTryToLeader(p)
173
	case proto.OpGetPartitionSize:
174
		s.handlePacketToGetPartitionSize(p)
175
	case proto.OpGetMaxExtentIDAndPartitionSize:
176
		s.handlePacketToGetMaxExtentIDAndPartitionSize(p)
177
	case proto.OpReadTinyDeleteRecord:
178
		s.handlePacketToReadTinyDeleteRecordFile(p, c)
179
	case proto.OpBroadcastMinAppliedID:
180
		s.handleBroadcastMinAppliedID(p)
181
	case proto.OpVersionOperation:
182
		s.handleUpdateVerPacket(p)
183
	case proto.OpStopDataPartitionRepair:
184
		s.handlePacketToStopDataPartitionRepair(p)
185
	default:
186
		p.PackErrorBody(repl.ErrorUnknownOp.Error(), repl.ErrorUnknownOp.Error()+strconv.Itoa(int(p.Opcode)))
187
	}
188

189
	return
190
}
191

192
// Handle OpCreateExtent packet.
193
func (s *DataNode) handlePacketToCreateExtent(p *repl.Packet) {
194
	var err error
195
	defer func() {
196
		if err != nil {
197
			p.PackErrorBody(ActionCreateExtent, err.Error())
198
		} else {
199
			p.PacketOkReply()
200
		}
201
	}()
202
	partition := p.Object.(*DataPartition)
203
	if partition.Available() <= 0 || !partition.disk.CanWrite() {
204
		err = storage.NoSpaceError
205
		return
206
	} else if partition.disk.Status == proto.Unavailable {
207
		err = storage.BrokenDiskError
208
		return
209
	}
210

211
	// in case too many extents
212
	if partition.GetExtentCount() >= storage.MaxExtentCount+10 {
213
		err = storage.NoSpaceError
214
		return
215
	}
216

217
	partition.disk.allocCheckLimit(proto.IopsWriteType, 1)
218
	partition.disk.limitWrite.Run(0, func() {
219
		err = partition.ExtentStore().Create(p.ExtentID)
220
	})
221
}
222

223
// Handle OpCreateDataPartition packet.
224
func (s *DataNode) handlePacketToCreateDataPartition(p *repl.Packet) {
225
	var (
226
		err   error
227
		bytes []byte
228
		dp    *DataPartition
229
	)
230
	defer func() {
231
		if err != nil {
232
			p.PackErrorBody(ActionCreateDataPartition, err.Error())
233
		}
234
	}()
235
	task := &proto.AdminTask{}
236
	if err = json.Unmarshal(p.Data, task); err != nil {
237
		err = fmt.Errorf("cannnot unmashal adminTask")
238
		return
239
	}
240
	request := &proto.CreateDataPartitionRequest{}
241
	if task.OpCode != proto.OpCreateDataPartition {
242
		err = fmt.Errorf("from master Task(%v) failed,error unavali opcode(%v)", task.ToString(), task.OpCode)
243
		return
244
	}
245

246
	bytes, err = json.Marshal(task.Request)
247
	if err != nil {
248
		err = fmt.Errorf("from master Task(%v) cannot unmashal CreateDataPartition, err %s", task.ToString(), err.Error())
249
		return
250
	}
251
	p.AddMesgLog(string(bytes))
252
	if err = json.Unmarshal(bytes, request); err != nil {
253
		err = fmt.Errorf("from master Task(%v) cannot unmashal CreateDataPartitionRequest struct, err(%s)", task.ToString(), err.Error())
254
		return
255
	}
256
	p.PartitionID = request.PartitionId
257
	if dp, err = s.space.CreatePartition(request); err != nil {
258
		err = fmt.Errorf("from master Task(%v) cannot create Partition err(%v)", task.ToString(), err)
259
		return
260
	}
261
	p.PacketOkWithBody([]byte(dp.Disk().Path))
262
}
263

264
func (s *DataNode) commitDelVersion(volumeID string, verSeq uint64) (err error) {
265
	for _, partition := range s.space.partitions {
266
		if partition.config.VolName != volumeID {
267
			continue
268
		}
269
		verListMgr := partition.volVersionInfoList
270
		verListMgr.RWLock.Lock()
271
		for i, ver := range verListMgr.VerList {
272
			if i == len(verListMgr.VerList)-1 {
273
				log.LogWarnf("action[commitDelVersion] dp[%v] seq %v, seqArray size %v newest ver %v",
274
					partition.config.PartitionID, verSeq, len(verListMgr.VerList), ver.Ver)
275
				break
276
			}
277
			if ver.Ver == verSeq {
278
				log.LogInfof("action[commitDelVersion] updateVerList dp[%v] seq %v,seqArray size %v", partition.config.PartitionID, verSeq, len(verListMgr.VerList))
279
				verListMgr.VerList = append(verListMgr.VerList[:i], verListMgr.VerList[i+1:]...)
280
				break
281
			}
282
		}
283
		verListMgr.RWLock.Unlock()
284
	}
285
	return
286
}
287

288
func (s *DataNode) commitCreateVersion(req *proto.MultiVersionOpRequest) (err error) {
289
	log.LogInfof("action[commitCreateVersion] handle master version reqeust %v", req)
290
	var (
291
		value interface{}
292
		ok    bool
293
		wg    sync.WaitGroup
294
	)
295
	if value, ok = s.volUpdating.Load(req.VolumeID); !ok {
296
		log.LogWarnf("action[commitCreateVersion] vol %v not found seq %v", req.VolumeID, req.VerSeq)
297
		return
298
	}
299

300
	ver2Phase := value.(*verOp2Phase)
301
	log.LogInfof("action[commitCreateVersion] try commit volume %v ver2Phase seq %v with req seq %v",
302
		req.VolumeID, ver2Phase.verPrepare, req.VerSeq)
303
	if req.VerSeq < ver2Phase.verSeq {
304
		log.LogWarnf("action[commitCreateVersion] vol %v seq %v create less than loal %v", req.VolumeID, req.VerSeq, ver2Phase.verSeq)
305
		return
306
	}
307
	if ver2Phase.step != proto.CreateVersionPrepare {
308
		log.LogWarnf("action[commitCreateVersion] vol %v seq %v step not prepare", req.VolumeID, ver2Phase.step)
309
	}
310

311
	s.space.partitionMutex.RLock()
312
	defer s.space.partitionMutex.RUnlock()
313
	resultCh := make(chan error, len(s.space.partitions))
314
	for _, partition := range s.space.partitions {
315
		if partition.config.VolName != req.VolumeID {
316
			continue
317
		}
318
		if !partition.isRaftLeader {
319
			continue
320
		}
321
		wg.Add(1)
322
		go func(partition *DataPartition) {
323
			defer wg.Done()
324
			log.LogInfof("action[commitCreateVersion] volume %v dp[%v] do HandleVersionOp verSeq[%v]",
325
				partition.volumeID, partition.partitionID, partition.verSeq)
326
			if err = partition.HandleVersionOp(req); err != nil {
327
				log.LogErrorf("action[commitCreateVersion] volume %v dp[%v] do HandleVersionOp verSeq[%v] err %v",
328
					partition.volumeID, partition.partitionID, partition.verSeq, err)
329
				resultCh <- err
330
				return
331
			}
332
		}(partition)
333
	}
334

335
	wg.Wait()
336
	select {
337
	case err = <-resultCh:
338
		if err != nil {
339
			close(resultCh)
340
			return
341
		}
342
	default:
343
		log.LogInfof("action[commitCreateVersion] volume %v do HandleVersionOp verseq [%v] finished", req.VolumeID, req.VerSeq)
344
	}
345
	close(resultCh)
346
	if req.Op == proto.DeleteVersion {
347
		return
348
	}
349

350
	if req.Op == proto.CreateVersionPrepare {
351
		log.LogInfof("action[commitCreateVersion] commit volume %v prepare seq %v with commit seq %v",
352
			req.VolumeID, ver2Phase.verPrepare, req.VerSeq)
353
		return
354
	}
355

356
	ver2Phase.verSeq = req.VerSeq
357
	ver2Phase.step = proto.CreateVersionCommit
358
	ver2Phase.status = proto.VersionWorkingFinished
359
	log.LogInfof("action[commitCreateVersion] commit volume %v prepare seq %v with commit seq %v",
360
		req.VolumeID, ver2Phase.verPrepare, req.VerSeq)
361

362
	return
363
}
364

365
func (s *DataNode) prepareCreateVersion(req *proto.MultiVersionOpRequest) (err error, opAagin bool) {
366
	var ver2Phase *verOp2Phase
367
	if value, ok := s.volUpdating.Load(req.VolumeID); ok {
368
		ver2Phase = value.(*verOp2Phase)
369
		if req.VerSeq < ver2Phase.verSeq {
370
			err = fmt.Errorf("seq %v create less than loal %v", req.VerSeq, ver2Phase.verSeq)
371
			log.LogInfof("action[prepareCreateVersion] volume %v update to ver %v step %v", req.VolumeID, req.VerSeq, ver2Phase.step)
372
			return
373
		} else if req.VerSeq == ver2Phase.verPrepare {
374
			if ver2Phase.step == proto.VersionWorking {
375
				opAagin = true
376
				return
377
			}
378
		}
379
	}
380
	ver2Phase = &verOp2Phase{}
381
	ver2Phase.step = uint32(req.Op)
382
	ver2Phase.status = proto.VersionWorking
383
	ver2Phase.verPrepare = req.VerSeq
384

385
	s.volUpdating.Store(req.VolumeID, ver2Phase)
386

387
	log.LogInfof("action[prepareCreateVersion] volume %v update seq to %v step %v",
388
		req.VolumeID, req.VerSeq, ver2Phase.step)
389
	return
390
}
391

392
// Handle OpHeartbeat packet.
393
func (s *DataNode) handleUpdateVerPacket(p *repl.Packet) {
394
	var err error
395
	defer func() {
396
		if err != nil {
397
			p.PackErrorBody(ActionUpdateVersion, err.Error())
398
		} else {
399
			p.PacketOkReply()
400
		}
401
	}()
402

403
	task := &proto.AdminTask{}
404
	err = json.Unmarshal(p.Data, task)
405
	if err != nil {
406
		log.LogErrorf("action[handleUpdateVerPacket] handle master version reqeust err %v", err)
407
		return
408
	}
409
	request := &proto.MultiVersionOpRequest{}
410
	response := &proto.MultiVersionOpResponse{}
411
	response.Op = task.OpCode
412
	response.Status = proto.TaskSucceeds
413

414
	if task.OpCode == proto.OpVersionOperation {
415
		marshaled, _ := json.Marshal(task.Request)
416
		if err = json.Unmarshal(marshaled, request); err != nil {
417
			log.LogErrorf("action[handleUpdateVerPacket] handle master version reqeust err %v", err)
418
			response.Status = proto.TaskFailed
419
			goto end
420
		}
421

422
		if request.Op == proto.CreateVersionPrepare {
423
			if err, _ = s.prepareCreateVersion(request); err != nil {
424
				log.LogErrorf("action[handleUpdateVerPacket] handle master version reqeust err %v", err)
425
				goto end
426
			}
427
			if err = s.commitCreateVersion(request); err != nil {
428
				log.LogErrorf("action[handleUpdateVerPacket] handle master version reqeust err %v", err)
429
				goto end
430
			}
431
		} else if request.Op == proto.CreateVersionCommit {
432
			if err = s.commitCreateVersion(request); err != nil {
433
				log.LogErrorf("action[handleUpdateVerPacket] handle master version reqeust err %v", err)
434
				goto end
435
			}
436
		} else if request.Op == proto.DeleteVersion {
437
			if err = s.commitDelVersion(request.VolumeID, request.VerSeq); err != nil {
438
				log.LogErrorf("action[handleUpdateVerPacket] handle master version reqeust err %v", err)
439
				goto end
440
			}
441
		}
442

443
		response.VerSeq = request.VerSeq
444
		response.Op = request.Op
445
		response.Addr = request.Addr
446
		response.VolumeID = request.VolumeID
447

448
	} else {
449
		err = fmt.Errorf("illegal opcode")
450
		log.LogErrorf("action[handleUpdateVerPacket] handle master version reqeust err %v", err)
451
		goto end
452
	}
453
end:
454
	if err != nil {
455
		response.Result = err.Error()
456
	}
457
	task.Response = response
458
	log.LogInfof("action[handleUpdateVerPacket] rsp to client,req vol %v, verseq %v, op %v", request.VolumeID, request.VerSeq, request.Op)
459
	if err = MasterClient.NodeAPI().ResponseDataNodeTask(task); err != nil {
460
		err = errors.Trace(err, "handleUpdateVerPacket to master failed.")
461
		log.LogErrorf(err.Error())
462
		return
463
	}
464
}
465

466
func (s *DataNode) checkVolumeForbidden(volNames []string) {
467
	s.space.RangePartitions(func(partition *DataPartition) bool {
468
		for _, volName := range volNames {
469
			if volName == partition.volumeID {
470
				partition.SetForbidden(true)
471
				return true
472
			}
473
		}
474
		partition.SetForbidden(false)
475
		return true
476
	})
477
}
478

479
func (s *DataNode) checkDecommissionDisks(decommissionDisks []string) {
480
	decommissionDiskSet := util.NewSet()
481
	for _, disk := range decommissionDisks {
482
		decommissionDiskSet.Add(disk)
483
	}
484
	disks := s.space.GetDisks()
485
	for _, disk := range disks {
486
		if disk.GetDecommissionStatus() && !decommissionDiskSet.Has(disk.Path) {
487
			log.LogDebugf("action[checkDecommissionDisks] mark %v to be undecommissioned", disk.Path)
488
			disk.MarkDecommissionStatus(false)
489
			continue
490
		}
491
		if !disk.GetDecommissionStatus() && decommissionDiskSet.Has(disk.Path) {
492
			log.LogDebugf("action[checkDecommissionDisks] mark %v to be decommissioned", disk.Path)
493
			disk.MarkDecommissionStatus(true)
494
			continue
495
		}
496
	}
497
}
498

499
// Handle OpHeartbeat packet.
500
func (s *DataNode) handleHeartbeatPacket(p *repl.Packet) {
501
	var err error
502
	task := &proto.AdminTask{}
503
	err = json.Unmarshal(p.Data, task)
504
	defer func() {
505
		if err != nil {
506
			p.PackErrorBody(ActionCreateDataPartition, err.Error())
507
		} else {
508
			p.PacketOkReply()
509
		}
510
	}()
511
	if err != nil {
512
		return
513
	}
514

515
	go func() {
516
		request := &proto.HeartBeatRequest{}
517
		response := &proto.DataNodeHeartbeatResponse{}
518
		s.buildHeartBeatResponse(response)
519

520
		if task.OpCode == proto.OpDataNodeHeartbeat {
521
			marshaled, _ := json.Marshal(task.Request)
522
			_ = json.Unmarshal(marshaled, request)
523
			response.Status = proto.TaskSucceeds
524
			if s.diskQosEnableFromMaster != request.EnableDiskQos {
525
				log.LogWarnf("action[handleHeartbeatPacket] master command disk qos enable change to [%v], local conf enable [%v]",
526
					request.EnableDiskQos,
527
					s.diskQosEnable)
528
			}
529

530
			// set volume forbidden
531
			s.checkVolumeForbidden(request.ForbiddenVols)
532
			// set decommission disks
533
			s.checkDecommissionDisks(request.DecommissionDisks)
534
			s.diskQosEnableFromMaster = request.EnableDiskQos
535

536
			var needUpdate bool
537
			for _, pair := range []struct {
538
				replace uint64
539
				origin  *int
540
			}{
541
				{request.QosFlowWriteLimit, &s.diskWriteFlow},
542
				{request.QosFlowReadLimit, &s.diskReadFlow},
543
				{request.QosIopsWriteLimit, &s.diskWriteIops},
544
				{request.QosIopsReadLimit, &s.diskReadIops},
545
			} {
546
				if pair.replace > 0 && int(pair.replace) != *pair.origin {
547
					*pair.origin = int(pair.replace)
548
					needUpdate = true
549
				}
550
			}
551

552
			// set cpu util and io used in here
553
			response.CpuUtil = s.cpuUtil.Load()
554
			response.IoUtils = s.space.GetDiskUtils()
555

556
			if needUpdate {
557
				log.LogWarnf("action[handleHeartbeatPacket] master change disk qos limit to [flowWrite %v, flowRead %v, iopsWrite %v, iopsRead %v]",
558
					s.diskWriteFlow, s.diskReadFlow, s.diskWriteIops, s.diskReadIops)
559
				s.updateQosLimit()
560
			}
561
		} else {
562
			response.Status = proto.TaskFailed
563
			err = fmt.Errorf("illegal opcode")
564
			response.Result = err.Error()
565
		}
566
		task.Response = response
567
		if err = MasterClient.NodeAPI().ResponseDataNodeTask(task); err != nil {
568
			err = errors.Trace(err, "heartbeat to master(%v) failed.", request.MasterAddr)
569
			log.LogErrorf(err.Error())
570
			return
571
		}
572
	}()
573
}
574

575
// Handle OpDeleteDataPartition packet.
576
func (s *DataNode) handlePacketToDeleteDataPartition(p *repl.Packet) {
577
	task := &proto.AdminTask{}
578
	err := json.Unmarshal(p.Data, task)
579
	defer func() {
580
		if err != nil {
581
			p.PackErrorBody(ActionDeleteDataPartition, err.Error())
582
		} else {
583
			p.PacketOkReply()
584
		}
585
	}()
586
	if err != nil {
587
		return
588
	}
589
	request := &proto.DeleteDataPartitionRequest{}
590
	if task.OpCode == proto.OpDeleteDataPartition {
591
		bytes, _ := json.Marshal(task.Request)
592
		p.AddMesgLog(string(bytes))
593
		err = json.Unmarshal(bytes, request)
594
		if err != nil {
595
			return
596
		} else {
597
			s.space.DeletePartition(request.PartitionId)
598
		}
599
	} else {
600
		err = fmt.Errorf("illegal opcode ")
601
	}
602
	if err != nil {
603
		err = errors.Trace(err, "delete DataPartition failed,PartitionID(%v)", request.PartitionId)
604
		log.LogErrorf("action[handlePacketToDeleteDataPartition] err(%v).", err)
605
	}
606
	log.LogInfof(fmt.Sprintf("action[handlePacketToDeleteDataPartition] %v error(%v)", request.PartitionId, err))
607
}
608

609
// Handle OpLoadDataPartition packet.
610
func (s *DataNode) handlePacketToLoadDataPartition(p *repl.Packet) {
611
	task := &proto.AdminTask{}
612
	var err error
613
	defer func() {
614
		if err != nil {
615
			p.PackErrorBody(ActionLoadDataPartition, err.Error())
616
		} else {
617
			p.PacketOkReply()
618
		}
619
	}()
620
	err = json.Unmarshal(p.Data, task)
621
	p.PacketOkReply()
622
	go s.asyncLoadDataPartition(task)
623
}
624

625
func (s *DataNode) asyncLoadDataPartition(task *proto.AdminTask) {
626
	var err error
627
	request := &proto.LoadDataPartitionRequest{}
628
	response := &proto.LoadDataPartitionResponse{}
629
	if task.OpCode == proto.OpLoadDataPartition {
630
		bytes, _ := json.Marshal(task.Request)
631
		json.Unmarshal(bytes, request)
632
		dp := s.space.Partition(request.PartitionId)
633
		if dp == nil {
634
			response.Status = proto.TaskFailed
635
			response.PartitionId = uint64(request.PartitionId)
636
			err = fmt.Errorf(fmt.Sprintf("DataPartition(%v) not found", request.PartitionId))
637
			response.Result = err.Error()
638
		} else {
639
			response = dp.Load()
640
			response.PartitionId = uint64(request.PartitionId)
641
			response.Status = proto.TaskSucceeds
642
		}
643
	} else {
644
		response.PartitionId = uint64(request.PartitionId)
645
		response.Status = proto.TaskFailed
646
		err = fmt.Errorf("illegal opcode")
647
		response.Result = err.Error()
648
	}
649
	task.Response = response
650
	if err = MasterClient.NodeAPI().ResponseDataNodeTask(task); err != nil {
651
		err = errors.Trace(err, "load DataPartition failed,PartitionID(%v)", request.PartitionId)
652
		log.LogError(errors.Stack(err))
653
	}
654
}
655

656
// Handle OpMarkDelete packet.
657
func (s *DataNode) handleMarkDeletePacket(p *repl.Packet, c net.Conn) {
658
	var err error
659
	defer func() {
660
		if err != nil {
661
			p.PackErrorBody(ActionBatchMarkDelete, err.Error())
662
		} else {
663
			p.PacketOkReply()
664
		}
665
	}()
666
	partition := p.Object.(*DataPartition)
667
	// NOTE: we cannot prevent mark delete
668
	// even the partition is forbidden, because
669
	// the inode already be deleted in meta partition
670
	// if we prevent it, we will get "orphan extents"
671
	if proto.IsTinyExtentType(p.ExtentType) || p.Opcode == proto.OpSplitMarkDelete {
672
		ext := new(proto.TinyExtentDeleteRecord)
673
		err = json.Unmarshal(p.Data, ext)
674
		if err == nil {
675
			log.LogInfof("handleMarkDeletePacket Delete PartitionID(%v)_Extent(%v)_Offset(%v)_Size(%v)",
676
				p.PartitionID, p.ExtentID, ext.ExtentOffset, ext.Size)
677
			partition.disk.allocCheckLimit(proto.IopsWriteType, 1)
678
			partition.disk.limitWrite.Run(0, func() {
679
				err = partition.ExtentStore().MarkDelete(p.ExtentID, int64(ext.ExtentOffset), int64(ext.Size))
680
				if err != nil {
681
					log.LogErrorf("action[handleMarkDeletePacket]: failed to mark delete extent(%v), %v", p.ExtentID, err)
682
				}
683
			})
684
		}
685
	} else {
686
		log.LogInfof("handleMarkDeletePacket Delete PartitionID(%v)_Extent(%v)",
687
			p.PartitionID, p.ExtentID)
688
		partition.disk.allocCheckLimit(proto.IopsWriteType, 1)
689
		partition.disk.limitWrite.Run(0, func() {
690
			err = partition.ExtentStore().MarkDelete(p.ExtentID, 0, 0)
691
			if err != nil {
692
				log.LogErrorf("action[handleMarkDeletePacket]: failed to mark delete extent(%v), %v", p.ExtentID, err)
693
			}
694
		})
695
	}
696
}
697

698
// Handle OpMarkDelete packet.
699
func (s *DataNode) handleBatchMarkDeletePacket(p *repl.Packet, c net.Conn) {
700
	var err error
701
	defer func() {
702
		if err != nil {
703
			log.LogErrorf(fmt.Sprintf("(%v) error(%v).", p.GetUniqueLogId(), err))
704
			p.PackErrorBody(ActionBatchMarkDelete, err.Error())
705
		} else {
706
			p.PacketOkReply()
707
		}
708
	}()
709
	partition := p.Object.(*DataPartition)
710
	// NOTE: we cannot prevent mark delete
711
	// even the partition is forbidden, because
712
	// the inode already be deleted in meta partition
713
	// if we prevent it, we will get "orphan extents"
714
	var exts []*proto.ExtentKey
715
	err = json.Unmarshal(p.Data, &exts)
716
	store := partition.ExtentStore()
717
	if err == nil {
718
		for _, ext := range exts {
719
			if deleteLimiteRater.Allow() {
720
				log.LogInfof(fmt.Sprintf("recive DeleteExtent (%v) from (%v)", ext, c.RemoteAddr().String()))
721
				partition.disk.allocCheckLimit(proto.IopsWriteType, 1)
722
				partition.disk.limitWrite.Run(0, func() {
723
					err = store.MarkDelete(ext.ExtentId, int64(ext.ExtentOffset), int64(ext.Size))
724
					if err != nil {
725
						log.LogErrorf("action[handleBatchMarkDeletePacket]: failed to mark delete extent(%v), %v", p.ExtentID, err)
726
					}
727
				})
728
				if err != nil {
729
					return
730
				}
731
			} else {
732
				log.LogInfof("delete limiter reach(%v), remote (%v) try again.", deleteLimiteRater.Limit(), c.RemoteAddr().String())
733
				err = storage.TryAgainError
734
			}
735
		}
736
	}
737
}
738

739
// Handle OpWrite packet.
740
func (s *DataNode) handleWritePacket(p *repl.Packet) {
741
	var (
742
		err                     error
743
		metricPartitionIOLabels map[string]string
744
		partitionIOMetric       *exporter.TimePointCount
745
	)
746
	defer func() {
747
		if err != nil {
748
			p.PackErrorBody(ActionWrite, err.Error())
749
		} else {
750
			p.PacketOkReply()
751
		}
752
	}()
753
	partition := p.Object.(*DataPartition)
754
	if partition.IsForbidden() {
755
		err = ErrForbiddenDataPartition
756
		return
757
	}
758
	shallDegrade := p.ShallDegrade()
759
	if !shallDegrade {
760
		metricPartitionIOLabels = GetIoMetricLabels(partition, "write")
761
	}
762
	if partition.Available() <= 0 || !partition.disk.CanWrite() {
763
		err = storage.NoSpaceError
764
		return
765
	} else if partition.disk.Status == proto.Unavailable {
766
		err = storage.BrokenDiskError
767
		return
768
	}
769
	store := partition.ExtentStore()
770
	if proto.IsTinyExtentType(p.ExtentType) {
771
		if !shallDegrade {
772
			partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
773
		}
774

775
		partition.disk.allocCheckLimit(proto.FlowWriteType, uint32(p.Size))
776
		partition.disk.allocCheckLimit(proto.IopsWriteType, 1)
777

778
		if writable := partition.disk.limitWrite.TryRun(int(p.Size), func() {
779
			_, err = store.Write(p.ExtentID, p.ExtentOffset, int64(p.Size), p.Data, p.CRC, storage.AppendWriteType, p.IsSyncWrite())
780
		}); !writable {
781
			err = storage.TryAgainError
782
			return
783
		}
784
		if !shallDegrade {
785
			s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
786
			partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
787
		}
788
		partition.checkIsDiskError(err, WriteFlag)
789
		return
790
	}
791

792
	if p.Size <= util.BlockSize {
793
		if !shallDegrade {
794
			partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
795
		}
796

797
		partition.disk.allocCheckLimit(proto.FlowWriteType, uint32(p.Size))
798
		partition.disk.allocCheckLimit(proto.IopsWriteType, 1)
799

800
		if writable := partition.disk.limitWrite.TryRun(int(p.Size), func() {
801
			_, err = store.Write(p.ExtentID, p.ExtentOffset, int64(p.Size), p.Data, p.CRC, storage.AppendWriteType, p.IsSyncWrite())
802
		}); !writable {
803
			err = storage.TryAgainError
804
			return
805
		}
806
		if !shallDegrade {
807
			s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
808
			partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
809
		}
810
		partition.checkIsDiskError(err, WriteFlag)
811
	} else {
812
		size := p.Size
813
		offset := 0
814
		for size > 0 {
815
			if size <= 0 {
816
				break
817
			}
818
			currSize := util.Min(int(size), util.BlockSize)
819
			data := p.Data[offset : offset+currSize]
820
			crc := crc32.ChecksumIEEE(data)
821
			if !shallDegrade {
822
				partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
823
			}
824

825
			partition.disk.allocCheckLimit(proto.FlowWriteType, uint32(currSize))
826
			partition.disk.allocCheckLimit(proto.IopsWriteType, 1)
827

828
			if writable := partition.disk.limitWrite.TryRun(currSize, func() {
829
				_, err = store.Write(p.ExtentID, p.ExtentOffset+int64(offset), int64(currSize), data, crc, storage.AppendWriteType, p.IsSyncWrite())
830
			}); !writable {
831
				err = storage.TryAgainError
832
				return
833
			}
834
			if !shallDegrade {
835
				s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
836
				partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
837
			}
838
			partition.checkIsDiskError(err, WriteFlag)
839
			if err != nil {
840
				break
841
			}
842
			size -= uint32(currSize)
843
			offset += currSize
844
		}
845
	}
846
}
847

848
func (s *DataNode) handleRandomWritePacket(p *repl.Packet) {
849
	var (
850
		err error
851

852
		metricPartitionIOLabels map[string]string
853
		partitionIOMetric       *exporter.TimePointCount
854
	)
855

856
	defer func() {
857
		log.LogDebugf("action[handleRandomWritePacket opcod %v seq %v dpid %v resultCode %v extid %v err %v",
858
			p.Opcode, p.VerSeq, p.PartitionID, p.ResultCode, p.ExtentID, err)
859
		if err != nil {
860
			p.PackErrorBody(ActionWrite, err.Error())
861
		} else {
862
			// avoid rsp pack ver info into package which client need do more work to read buffer
863
			if p.Opcode == proto.OpRandomWriteVer || p.Opcode == proto.OpSyncRandomWriteVer {
864
				p.Opcode = proto.OpSyncRandomWriteVerRsp
865
			}
866
			if p.Opcode == proto.OpTryWriteAppend && p.ResultCode == proto.OpTryOtherExtent {
867
				p.PackErrorBody(ActionWrite, storage.SnapshotNeedNewExtentError.Error())
868
				p.ResultCode = proto.OpTryOtherExtent
869
				log.LogDebugf("action[handleRandomWritePacket opcod %v seq %v dpid %v resultCode %v extid %v", p.Opcode, p.VerSeq, p.PartitionID, p.ResultCode, p.ExtentID)
870
				return
871
			}
872
			p.PacketOkReply()
873
		}
874
	}()
875

876
	partition := p.Object.(*DataPartition)
877
	if partition.IsForbidden() {
878
		err = ErrForbiddenDataPartition
879
		return
880
	}
881
	log.LogDebugf("action[handleRandomWritePacket opcod %v seq %v dpid %v dpseq %v extid %v", p.Opcode, p.VerSeq, p.PartitionID, partition.verSeq, p.ExtentID)
882
	// cache or preload partition not support raft and repair.
883
	if !partition.isNormalType() {
884
		err = raft.ErrStopped
885
		return
886
	}
887

888
	_, isLeader := partition.IsRaftLeader()
889
	if !isLeader {
890
		err = raft.ErrNotLeader
891
		return
892
	}
893
	shallDegrade := p.ShallDegrade()
894
	if !shallDegrade {
895
		metricPartitionIOLabels = GetIoMetricLabels(partition, "randwrite")
896
		partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
897
	}
898

899
	err = partition.RandomWriteSubmit(p)
900
	if !shallDegrade {
901
		s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
902
		partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
903
	}
904

905
	if err != nil && strings.Contains(err.Error(), raft.ErrNotLeader.Error()) {
906
		err = raft.ErrNotLeader
907
		log.LogErrorf("action[handleRandomWritePacket] opcod %v seq %v dpid %v dpseq %v extid %v err %v", p.Opcode, p.VerSeq, p.PartitionID, partition.verSeq, p.ExtentID, err)
908
		return
909
	}
910

911
	if err == nil && p.ResultCode != proto.OpOk && p.ResultCode != proto.OpTryOtherExtent {
912
		log.LogErrorf("action[handleRandomWritePacket] opcod %v seq %v dpid %v dpseq %v extid %v ResultCode %v",
913
			p.Opcode, p.VerSeq, p.PartitionID, partition.verSeq, p.ExtentID, p.ResultCode)
914
		err = storage.TryAgainError
915
		return
916
	}
917
	log.LogDebugf("action[handleRandomWritePacket] opcod %v seq %v dpid %v dpseq %v after raft submit err %v resultCode %v",
918
		p.Opcode, p.VerSeq, p.PartitionID, partition.verSeq, err, p.ResultCode)
919
}
920

921
func (s *DataNode) handleStreamReadPacket(p *repl.Packet, connect net.Conn, isRepairRead bool) {
922
	var err error
923
	defer func() {
924
		if err != nil {
925
			p.PackErrorBody(ActionStreamRead, err.Error())
926
			p.WriteToConn(connect)
927
		}
928
	}()
929
	partition := p.Object.(*DataPartition)
930

931
	// cache or preload partition not support raft and repair.
932
	if !partition.isNormalType() {
933
		err = raft.ErrStopped
934
		return
935
	}
936

937
	if err = partition.CheckLeader(p, connect); err != nil {
938
		return
939
	}
940
	s.extentRepairReadPacket(p, connect, isRepairRead)
941
}
942

943
func (s *DataNode) handleExtentRepairReadPacket(p *repl.Packet, connect net.Conn, isRepairRead bool) {
944
	var err error
945
	log.LogDebugf("handleExtentRepairReadPacket %v", p)
946
	defer func() {
947
		if err != nil {
948
			p.PackErrorBody(ActionStreamRead, err.Error())
949
			p.WriteToConn(connect)
950
			return
951
		}
952
		fininshDoExtentRepair()
953
	}()
954

955
	err = requestDoExtentRepair()
956
	if err != nil {
957
		return
958
	}
959

960
	s.extentRepairReadPacket(p, connect, isRepairRead)
961
}
962

963
func (s *DataNode) handleTinyExtentRepairReadPacket(p *repl.Packet, connect net.Conn) {
964
	s.tinyExtentRepairRead(p, connect)
965
}
966

967
func (s *DataNode) extentRepairReadPacket(p *repl.Packet, connect net.Conn, isRepairRead bool) {
968
	var (
969
		err error
970

971
		metricPartitionIOLabels     map[string]string
972
		partitionIOMetric, tpObject *exporter.TimePointCount
973
	)
974
	defer func() {
975
		if err != nil {
976
			p.PackErrorBody(ActionStreamRead, err.Error())
977
			p.WriteToConn(connect)
978
		}
979
	}()
980
	partition := p.Object.(*DataPartition)
981
	needReplySize := p.Size
982
	offset := p.ExtentOffset
983
	store := partition.ExtentStore()
984
	shallDegrade := p.ShallDegrade()
985
	if !shallDegrade {
986
		metricPartitionIOLabels = GetIoMetricLabels(partition, "read")
987
	}
988
	log.LogDebugf("extentRepairReadPacket dp %v offset %v needSize %v", partition.partitionID, offset, needReplySize)
989
	for {
990
		if needReplySize <= 0 {
991
			break
992
		}
993
		err = nil
994
		reply := repl.NewStreamReadResponsePacket(p.ReqID, p.PartitionID, p.ExtentID)
995
		reply.StartT = p.StartT
996
		currReadSize := uint32(util.Min(int(needReplySize), util.ReadBlockSize))
997
		if currReadSize == util.ReadBlockSize {
998
			reply.Data, _ = proto.Buffers.Get(util.ReadBlockSize)
999
		} else {
1000
			reply.Data = make([]byte, currReadSize)
1001
		}
1002
		if !shallDegrade {
1003
			partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
1004
			tpObject = exporter.NewTPCnt(fmt.Sprintf("Repair_%s", p.GetOpMsg()))
1005
		}
1006
		reply.ExtentOffset = offset
1007
		p.Size = currReadSize
1008
		p.ExtentOffset = offset
1009

1010
		partition.Disk().allocCheckLimit(proto.IopsReadType, 1)
1011
		partition.Disk().allocCheckLimit(proto.FlowReadType, currReadSize)
1012

1013
		partition.disk.limitRead.Run(int(currReadSize), func() {
1014
			reply.CRC, err = store.Read(reply.ExtentID, offset, int64(currReadSize), reply.Data, isRepairRead)
1015
		})
1016
		if !shallDegrade {
1017
			s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
1018
			partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
1019
			tpObject.Set(err)
1020
		}
1021
		partition.checkIsDiskError(err, ReadFlag)
1022
		p.CRC = reply.CRC
1023
		if err != nil {
1024
			log.LogErrorf("action[operatePacket] err %v", err)
1025
			return
1026
		}
1027
		reply.Size = currReadSize
1028
		reply.ResultCode = proto.OpOk
1029
		reply.Opcode = p.Opcode
1030
		p.ResultCode = proto.OpOk
1031
		if err = reply.WriteToConn(connect); err != nil {
1032
			return
1033
		}
1034
		needReplySize -= currReadSize
1035
		offset += int64(currReadSize)
1036
		if currReadSize == util.ReadBlockSize {
1037
			proto.Buffers.Put(reply.Data)
1038
		}
1039
		logContent := fmt.Sprintf("action[operatePacket] %v.",
1040
			reply.LogMessage(reply.GetOpMsg(), connect.RemoteAddr().String(), reply.StartT, err))
1041
		log.LogReadf(logContent)
1042
	}
1043
	p.PacketOkReply()
1044
}
1045

1046
func (s *DataNode) handlePacketToGetAllWatermarks(p *repl.Packet) {
1047
	var (
1048
		buf       []byte
1049
		fInfoList []*storage.ExtentInfo
1050
		err       error
1051
	)
1052
	partition := p.Object.(*DataPartition)
1053
	store := partition.ExtentStore()
1054
	if proto.IsNormalExtentType(p.ExtentType) {
1055
		fInfoList, _, err = store.GetAllWatermarks(storage.NormalExtentFilter())
1056
	} else {
1057
		extents := make([]uint64, 0)
1058
		err = json.Unmarshal(p.Data, &extents)
1059
		if err == nil {
1060
			fInfoList, _, err = store.GetAllWatermarks(storage.TinyExtentFilter(extents))
1061
		}
1062
	}
1063
	if err != nil {
1064
		p.PackErrorBody(ActionGetAllExtentWatermarks, err.Error())
1065
	} else {
1066
		buf, err = json.Marshal(fInfoList)
1067
		if err != nil {
1068
			p.PackErrorBody(ActionGetAllExtentWatermarks, err.Error())
1069
		} else {
1070
			p.PacketOkWithByte(buf)
1071
		}
1072
	}
1073
}
1074

1075
func (s *DataNode) writeEmptyPacketOnTinyExtentRepairRead(reply *repl.Packet, newOffset, currentOffset int64, connect net.Conn) (replySize int64, err error) {
1076
	replySize = newOffset - currentOffset
1077
	reply.Data = make([]byte, 0)
1078
	reply.Size = 0
1079
	reply.CRC = crc32.ChecksumIEEE(reply.Data)
1080
	reply.ResultCode = proto.OpOk
1081
	reply.ExtentOffset = currentOffset
1082
	reply.Arg[0] = EmptyResponse
1083
	binary.BigEndian.PutUint64(reply.Arg[1:9], uint64(replySize))
1084
	err = reply.WriteToConn(connect)
1085
	reply.Size = uint32(replySize)
1086
	logContent := fmt.Sprintf("action[operatePacket] %v.",
1087
		reply.LogMessage(reply.GetOpMsg(), connect.RemoteAddr().String(), reply.StartT, err))
1088
	log.LogReadf(logContent)
1089

1090
	return
1091
}
1092

1093
func (s *DataNode) attachAvaliSizeOnTinyExtentRepairRead(reply *repl.Packet, avaliSize uint64) {
1094
	binary.BigEndian.PutUint64(reply.Arg[9:17], avaliSize)
1095
}
1096

1097
// Handle tinyExtentRepairRead packet.
1098
func (s *DataNode) tinyExtentRepairRead(request *repl.Packet, connect net.Conn) {
1099
	var (
1100
		err                 error
1101
		needReplySize       int64
1102
		tinyExtentFinfoSize uint64
1103
	)
1104

1105
	defer func() {
1106
		if err != nil {
1107
			request.PackErrorBody(ActionStreamReadTinyExtentRepair, err.Error())
1108
			request.WriteToConn(connect)
1109
		}
1110
	}()
1111
	if !storage.IsTinyExtent(request.ExtentID) {
1112
		err = fmt.Errorf("unavali extentID (%v)", request.ExtentID)
1113
		return
1114
	}
1115

1116
	partition := request.Object.(*DataPartition)
1117
	store := partition.ExtentStore()
1118
	tinyExtentFinfoSize, err = store.TinyExtentGetFinfoSize(request.ExtentID)
1119
	if err != nil {
1120
		return
1121
	}
1122
	needReplySize = int64(request.Size)
1123
	offset := request.ExtentOffset
1124
	if uint64(request.ExtentOffset)+uint64(request.Size) > tinyExtentFinfoSize {
1125
		needReplySize = int64(tinyExtentFinfoSize - uint64(request.ExtentOffset))
1126
	}
1127
	avaliReplySize := uint64(needReplySize)
1128

1129
	var newOffset, newEnd int64
1130
	for {
1131
		if needReplySize <= 0 {
1132
			break
1133
		}
1134
		reply := repl.NewTinyExtentStreamReadResponsePacket(request.ReqID, request.PartitionID, request.ExtentID)
1135
		reply.ArgLen = TinyExtentRepairReadResponseArgLen
1136
		reply.Arg = make([]byte, TinyExtentRepairReadResponseArgLen)
1137
		s.attachAvaliSizeOnTinyExtentRepairRead(reply, avaliReplySize)
1138
		newOffset, newEnd, err = store.TinyExtentAvaliOffset(request.ExtentID, offset)
1139
		if err != nil {
1140
			return
1141
		}
1142
		if newOffset > offset {
1143
			var replySize int64
1144
			if replySize, err = s.writeEmptyPacketOnTinyExtentRepairRead(reply, newOffset, offset, connect); err != nil {
1145
				return
1146
			}
1147
			needReplySize -= replySize
1148
			offset += replySize
1149
			continue
1150
		}
1151
		currNeedReplySize := newEnd - newOffset
1152
		currReadSize := uint32(util.Min(int(currNeedReplySize), util.ReadBlockSize))
1153
		if currReadSize == util.ReadBlockSize {
1154
			reply.Data, _ = proto.Buffers.Get(util.ReadBlockSize)
1155
		} else {
1156
			reply.Data = make([]byte, currReadSize)
1157
		}
1158
		reply.ExtentOffset = offset
1159
		reply.CRC, err = store.Read(reply.ExtentID, offset, int64(currReadSize), reply.Data, false)
1160
		if err != nil {
1161
			return
1162
		}
1163
		reply.Size = uint32(currReadSize)
1164
		reply.ResultCode = proto.OpOk
1165
		if err = reply.WriteToConn(connect); err != nil {
1166
			connect.Close()
1167
			return
1168
		}
1169
		needReplySize -= int64(currReadSize)
1170
		offset += int64(currReadSize)
1171
		if currReadSize == util.ReadBlockSize {
1172
			proto.Buffers.Put(reply.Data)
1173
		}
1174
		logContent := fmt.Sprintf("action[operatePacket] %v.",
1175
			reply.LogMessage(reply.GetOpMsg(), connect.RemoteAddr().String(), reply.StartT, err))
1176
		log.LogReadf(logContent)
1177
	}
1178

1179
	request.PacketOkReply()
1180
}
1181

1182
func (s *DataNode) handlePacketToReadTinyDeleteRecordFile(p *repl.Packet, connect net.Conn) {
1183
	var err error
1184
	defer func() {
1185
		if err != nil {
1186
			p.PackErrorBody(ActionStreamReadTinyDeleteRecord, err.Error())
1187
			p.WriteToConn(connect)
1188
		}
1189
	}()
1190
	partition := p.Object.(*DataPartition)
1191
	store := partition.ExtentStore()
1192
	localTinyDeleteFileSize, err := store.LoadTinyDeleteFileOffset()
1193
	if err != nil {
1194
		return
1195
	}
1196
	needReplySize := localTinyDeleteFileSize - p.ExtentOffset
1197
	offset := p.ExtentOffset
1198
	reply := repl.NewReadTinyDeleteRecordResponsePacket(p.ReqID, p.PartitionID)
1199
	reply.StartT = time.Now().UnixNano()
1200
	for {
1201
		if needReplySize <= 0 {
1202
			break
1203
		}
1204
		err = nil
1205
		currReadSize := uint32(util.Min(int(needReplySize), MaxSyncTinyDeleteBufferSize))
1206
		reply.Data = make([]byte, currReadSize)
1207
		reply.ExtentOffset = offset
1208
		reply.CRC, err = store.ReadTinyDeleteRecords(offset, int64(currReadSize), reply.Data)
1209
		if err != nil {
1210
			err = fmt.Errorf(ActionStreamReadTinyDeleteRecord+" localTinyDeleteRecordSize(%v) offset(%v)"+
1211
				" currReadSize(%v) err(%v)", localTinyDeleteFileSize, offset, currReadSize, err)
1212
			return
1213
		}
1214
		reply.Size = uint32(currReadSize)
1215
		reply.ResultCode = proto.OpOk
1216
		if err = reply.WriteToConn(connect); err != nil {
1217
			return
1218
		}
1219
		needReplySize -= int64(currReadSize)
1220
		offset += int64(currReadSize)
1221
	}
1222
	p.PacketOkReply()
1223
}
1224

1225
// Handle OpNotifyReplicasToRepair packet.
1226
func (s *DataNode) handlePacketToNotifyExtentRepair(p *repl.Packet) {
1227
	var err error
1228
	partition := p.Object.(*DataPartition)
1229
	mf := new(DataPartitionRepairTask)
1230
	err = json.Unmarshal(p.Data, mf)
1231
	if err != nil {
1232
		p.PackErrorBody(ActionRepair, err.Error())
1233
		return
1234
	}
1235
	partition.DoExtentStoreRepair(mf)
1236
	p.PacketOkReply()
1237
}
1238

1239
// Handle OpBroadcastMinAppliedID
1240
func (s *DataNode) handleBroadcastMinAppliedID(p *repl.Packet) {
1241
	partition := p.Object.(*DataPartition)
1242
	minAppliedID := binary.BigEndian.Uint64(p.Data)
1243
	if minAppliedID > 0 {
1244
		partition.SetMinAppliedID(minAppliedID)
1245
	}
1246
	log.LogDebugf("[handleBroadcastMinAppliedID] partition(%v) minAppliedID(%v)", partition.partitionID, minAppliedID)
1247
	p.PacketOkReply()
1248
}
1249

1250
// Handle handlePacketToGetAppliedID packet.
1251
func (s *DataNode) handlePacketToGetAppliedID(p *repl.Packet) {
1252
	partition := p.Object.(*DataPartition)
1253
	appliedID := partition.GetAppliedID()
1254
	buf := make([]byte, 8)
1255
	binary.BigEndian.PutUint64(buf, appliedID)
1256
	p.PacketOkWithBody(buf)
1257
	p.AddMesgLog(fmt.Sprintf("_AppliedID(%v)", appliedID))
1258
}
1259

1260
func (s *DataNode) handlePacketToGetPartitionSize(p *repl.Packet) {
1261
	partition := p.Object.(*DataPartition)
1262
	usedSize := partition.extentStore.StoreSizeExtentID(p.ExtentID)
1263
	buf := make([]byte, 8)
1264
	binary.BigEndian.PutUint64(buf, uint64(usedSize))
1265
	p.AddMesgLog(fmt.Sprintf("partitionSize_(%v)", usedSize))
1266
	p.PacketOkWithBody(buf)
1267
}
1268

1269
func (s *DataNode) handlePacketToGetMaxExtentIDAndPartitionSize(p *repl.Packet) {
1270
	partition := p.Object.(*DataPartition)
1271
	maxExtentID, totalPartitionSize := partition.extentStore.GetMaxExtentIDAndPartitionSize()
1272

1273
	buf := make([]byte, 16)
1274
	binary.BigEndian.PutUint64(buf[0:8], uint64(maxExtentID))
1275
	binary.BigEndian.PutUint64(buf[8:16], totalPartitionSize)
1276
	p.PacketOkWithBody(buf)
1277
}
1278

1279
func (s *DataNode) handlePacketToDecommissionDataPartition(p *repl.Packet) {
1280
	var (
1281
		err          error
1282
		reqData      []byte
1283
		isRaftLeader bool
1284
		req          = &proto.DataPartitionDecommissionRequest{}
1285
	)
1286

1287
	defer func() {
1288
		if err != nil {
1289
			p.PackErrorBody(ActionDecommissionPartition, err.Error())
1290
		} else {
1291
			p.PacketOkReply()
1292
		}
1293
	}()
1294

1295
	adminTask := &proto.AdminTask{}
1296
	decode := json.NewDecoder(bytes.NewBuffer(p.Data))
1297
	decode.UseNumber()
1298
	if err = decode.Decode(adminTask); err != nil {
1299
		return
1300
	}
1301

1302
	reqData, err = json.Marshal(adminTask.Request)
1303
	if err != nil {
1304
		return
1305
	}
1306
	if err = json.Unmarshal(reqData, req); err != nil {
1307
		return
1308
	}
1309
	p.AddMesgLog(string(reqData))
1310
	dp := s.space.Partition(req.PartitionId)
1311
	if dp == nil {
1312
		err = fmt.Errorf("partition %v not exsit", req.PartitionId)
1313
		return
1314
	}
1315
	p.PartitionID = req.PartitionId
1316

1317
	isRaftLeader, err = s.forwardToRaftLeader(dp, p, false)
1318
	if !isRaftLeader {
1319
		err = raft.ErrNotLeader
1320
		return
1321
	}
1322
	if req.AddPeer.ID == req.RemovePeer.ID {
1323
		err = errors.NewErrorf("[opOfflineDataPartition]: AddPeer(%v) same withRemovePeer(%v)", req.AddPeer, req.RemovePeer)
1324
		return
1325
	}
1326
	if req.AddPeer.ID != 0 {
1327
		_, err = dp.ChangeRaftMember(raftProto.ConfAddNode, raftProto.Peer{ID: req.AddPeer.ID}, reqData)
1328
		if err != nil {
1329
			return
1330
		}
1331
	}
1332
	_, err = dp.ChangeRaftMember(raftProto.ConfRemoveNode, raftProto.Peer{ID: req.RemovePeer.ID}, reqData)
1333
	if err != nil {
1334
		return
1335
	}
1336
}
1337

1338
func (s *DataNode) handlePacketToAddDataPartitionRaftMember(p *repl.Packet) {
1339
	var (
1340
		err          error
1341
		reqData      []byte
1342
		isRaftLeader bool
1343
		req          = &proto.AddDataPartitionRaftMemberRequest{}
1344
	)
1345

1346
	defer func() {
1347
		if err != nil {
1348
			p.PackErrorBody(ActionAddDataPartitionRaftMember, err.Error())
1349
		} else {
1350
			p.PacketOkReply()
1351
		}
1352
	}()
1353

1354
	adminTask := &proto.AdminTask{}
1355
	decode := json.NewDecoder(bytes.NewBuffer(p.Data))
1356
	decode.UseNumber()
1357
	if err = decode.Decode(adminTask); err != nil {
1358
		return
1359
	}
1360

1361
	reqData, err = json.Marshal(adminTask.Request)
1362
	if err != nil {
1363
		return
1364
	}
1365
	if err = json.Unmarshal(reqData, req); err != nil {
1366
		return
1367
	}
1368

1369
	log.LogInfof("action[handlePacketToAddDataPartitionRaftMember] %v, partition id %v", req.AddPeer, req.PartitionId)
1370

1371
	p.AddMesgLog(string(reqData))
1372
	dp := s.space.Partition(req.PartitionId)
1373
	if dp == nil {
1374
		err = proto.ErrDataPartitionNotExists
1375
		return
1376
	}
1377
	p.PartitionID = req.PartitionId
1378
	if dp.IsExistReplica(req.AddPeer.Addr) {
1379
		log.LogInfof("handlePacketToAddDataPartitionRaftMember recive MasterCommand: %v "+
1380
			"addRaftAddr(%v) has exsit", string(reqData), req.AddPeer.Addr)
1381
		return
1382
	}
1383
	isRaftLeader, err = s.forwardToRaftLeader(dp, p, false)
1384
	if !isRaftLeader {
1385
		return
1386
	}
1387
	log.LogInfof("action[handlePacketToAddDataPartitionRaftMember] before ChangeRaftMember %v which is sync. partition id %v", req.AddPeer, req.PartitionId)
1388

1389
	if req.AddPeer.ID != 0 {
1390
		_, err = dp.ChangeRaftMember(raftProto.ConfAddNode, raftProto.Peer{ID: req.AddPeer.ID}, reqData)
1391
		if err != nil {
1392
			return
1393
		}
1394
	}
1395
	log.LogInfof("action[handlePacketToAddDataPartitionRaftMember] after ChangeRaftMember %v, partition id %v", req.AddPeer, &req.PartitionId)
1396
}
1397

1398
func (s *DataNode) handlePacketToRemoveDataPartitionRaftMember(p *repl.Packet) {
1399
	var (
1400
		err          error
1401
		reqData      []byte
1402
		isRaftLeader bool
1403
		req          = &proto.RemoveDataPartitionRaftMemberRequest{}
1404
	)
1405

1406
	defer func() {
1407
		if err != nil {
1408
			p.PackErrorBody(ActionRemoveDataPartitionRaftMember, err.Error())
1409
		} else {
1410
			p.PacketOkReply()
1411
		}
1412
	}()
1413

1414
	adminTask := &proto.AdminTask{}
1415
	decode := json.NewDecoder(bytes.NewBuffer(p.Data))
1416
	decode.UseNumber()
1417
	if err = decode.Decode(adminTask); err != nil {
1418
		return
1419
	}
1420

1421
	reqData, err = json.Marshal(adminTask.Request)
1422
	p.AddMesgLog(string(reqData))
1423
	if err != nil {
1424
		return
1425
	}
1426
	if err = json.Unmarshal(reqData, req); err != nil {
1427
		return
1428
	}
1429

1430
	dp := s.space.Partition(req.PartitionId)
1431
	if dp == nil {
1432
		return
1433
	}
1434

1435
	log.LogDebugf("action[handlePacketToRemoveDataPartitionRaftMember], req %v (%s) RemoveRaftPeer(%s) dp %v replicaNum %v",
1436
		p.GetReqID(), string(reqData), req.RemovePeer.Addr, dp.partitionID, dp.replicaNum)
1437

1438
	p.PartitionID = req.PartitionId
1439

1440
	if !dp.IsExistReplica(req.RemovePeer.Addr) {
1441
		log.LogWarnf("action[handlePacketToRemoveDataPartitionRaftMember] receive MasterCommand:  req %v[%v] "+
1442
			"RemoveRaftPeer(%v) has not exist", p.GetReqID(), string(reqData), req.RemovePeer.Addr)
1443
		return
1444
	}
1445

1446
	isRaftLeader, err = s.forwardToRaftLeader(dp, p, req.Force)
1447
	if !isRaftLeader {
1448
		log.LogWarnf("handlePacketToRemoveDataPartitionRaftMember return no leader")
1449
		return
1450
	}
1451
	if err = dp.CanRemoveRaftMember(req.RemovePeer, req.Force); err != nil {
1452
		log.LogWarnf("action[handlePacketToRemoveDataPartitionRaftMember] CanRemoveRaftMember failed "+
1453
			"req %v dp %v err %v",
1454
			p.GetReqID(), dp.partitionID, err.Error())
1455
		return
1456
	}
1457

1458
	if req.Force {
1459
		cc := &raftProto.ConfChange{
1460
			Type: raftProto.ConfRemoveNode,
1461
			Peer: raftProto.Peer{
1462
				ID: req.RemovePeer.ID,
1463
			},
1464
			Context: reqData,
1465
		}
1466
		s.raftStore.RaftServer().RemoveRaftForce(dp.partitionID, cc)
1467
		dp.ApplyMemberChange(cc, 0)
1468
		dp.PersistMetadata()
1469
		return
1470
	}
1471

1472
	if req.RemovePeer.ID != 0 {
1473
		log.LogDebugf("action[handlePacketToRemoveDataPartitionRaftMember] ChangeRaftMember "+
1474
			"req %v dp %v RemovePeer.ID %v", p.GetReqID(), dp.partitionID, req.RemovePeer.ID)
1475
		_, err = dp.ChangeRaftMember(raftProto.ConfRemoveNode, raftProto.Peer{ID: req.RemovePeer.ID}, reqData)
1476
		if err != nil {
1477
			return
1478
		}
1479
	}
1480
	log.LogDebugf("action[handlePacketToRemoveDataPartitionRaftMember] CanRemoveRaftMember complete "+
1481
		"req %v dp %v ", p.GetReqID(), dp.partitionID)
1482
}
1483

1484
func (s *DataNode) handlePacketToDataPartitionTryToLeader(p *repl.Packet) {
1485
	var err error
1486

1487
	defer func() {
1488
		if err != nil {
1489
			p.PackErrorBody(ActionDataPartitionTryToLeader, err.Error())
1490
			log.LogWarnf("handlePacketToDataPartitionTryToLeader: %v ", err.Error())
1491
		} else {
1492
			p.PacketOkReply()
1493
			log.LogDebugf("handlePacketToDataPartitionTryToLeader: partition %v success ", p.PartitionID)
1494
		}
1495
	}()
1496
	log.LogDebugf("handlePacketToDataPartitionTryToLeader: partition %v ", p.PartitionID)
1497
	dp := s.space.Partition(p.PartitionID)
1498
	if dp == nil {
1499
		err = fmt.Errorf("partition %v not exsit", p.PartitionID)
1500
		return
1501
	}
1502

1503
	if dp.raftStatus != RaftStatusRunning {
1504
		err = fmt.Errorf("partition %v raft not running", p.PartitionID)
1505
		return
1506
	}
1507

1508
	if dp.raftPartition.IsRaftLeader() {
1509
		log.LogWarnf("handlePacketToDataPartitionTryToLeader: %v is already leader", p.PartitionID)
1510
		return
1511
	}
1512
	err = dp.raftPartition.TryToLeader(dp.partitionID)
1513
}
1514

1515
func (s *DataNode) forwardToRaftLeader(dp *DataPartition, p *repl.Packet, force bool) (ok bool, err error) {
1516
	var (
1517
		conn       *net.TCPConn
1518
		leaderAddr string
1519
	)
1520

1521
	if leaderAddr, ok = dp.IsRaftLeader(); ok {
1522
		return
1523
	}
1524
	// return NoLeaderError if leaderAddr is nil
1525
	if leaderAddr == "" {
1526
		if force {
1527
			ok = true
1528
			log.LogInfof("action[forwardToRaftLeader] no leader but replica num %v continue", dp.replicaNum)
1529
			return
1530
		}
1531
		err = storage.NoLeaderError
1532
		return
1533
	}
1534

1535
	// forward the packet to the leader if local one is not the leader
1536
	conn, err = gConnPool.GetConnect(leaderAddr)
1537
	if err != nil {
1538
		return
1539
	}
1540
	defer func() {
1541
		gConnPool.PutConnect(conn, err != nil)
1542
	}()
1543
	err = p.WriteToConn(conn)
1544
	if err != nil {
1545
		return
1546
	}
1547
	if err = p.ReadFromConnWithVer(conn, proto.NoReadDeadlineTime); err != nil {
1548
		return
1549
	}
1550

1551
	return
1552
}
1553

1554
func (s *DataNode) handlePacketToStopDataPartitionRepair(p *repl.Packet) {
1555
	task := &proto.AdminTask{}
1556
	err := json.Unmarshal(p.Data, task)
1557
	defer func() {
1558
		if err != nil {
1559
			p.PackErrorBody(ActionStopDataPartitionRepair, err.Error())
1560
		} else {
1561
			p.PacketOkReply()
1562
		}
1563
	}()
1564
	if err != nil {
1565
		return
1566
	}
1567
	request := &proto.StopDataPartitionRepairRequest{}
1568
	if task.OpCode != proto.OpStopDataPartitionRepair {
1569
		err = fmt.Errorf("action[handlePacketToStopDataPartitionRepair] illegal opcode ")
1570
		log.LogWarnf("action[handlePacketToStopDataPartitionRepair] illegal opcode ")
1571
		return
1572
	}
1573

1574
	bytes, _ := json.Marshal(task.Request)
1575
	p.AddMesgLog(string(bytes))
1576
	err = json.Unmarshal(bytes, request)
1577
	if err != nil {
1578
		return
1579
	}
1580
	log.LogDebugf("action[handlePacketToStopDataPartitionRepair] try stop %v", request.PartitionId)
1581
	dp := s.space.Partition(request.PartitionId)
1582
	if dp == nil {
1583
		err = proto.ErrDataPartitionNotExists
1584
		log.LogWarnf("action[handlePacketToStopDataPartitionRepair] cannot find dp %v", request.PartitionId)
1585
		return
1586
	}
1587
	dp.StopDecommissionRecover(request.Stop)
1588
	log.LogInfof("action[handlePacketToStopDataPartitionRepair] %v stop %v success", request.PartitionId, request.Stop)
1589
}
1590

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.