28
"github.com/cubefs/cubefs/proto"
29
"github.com/cubefs/cubefs/repl"
30
"github.com/cubefs/cubefs/storage"
31
"github.com/cubefs/cubefs/util"
32
"github.com/cubefs/cubefs/util/errors"
33
"github.com/cubefs/cubefs/util/log"
37
type DataPartitionRepairTask struct {
40
extents map[uint64]*storage.ExtentInfo
41
ExtentsToBeCreated []*storage.ExtentInfo
42
ExtentsToBeRepaired []*storage.ExtentInfo
43
LeaderTinyDeleteRecordFileSize int64
47
func NewDataPartitionRepairTask(extentFiles []*storage.ExtentInfo, tinyDeleteRecordFileSize int64, source, leaderAddr string) (task *DataPartitionRepairTask) {
48
task = &DataPartitionRepairTask{
49
extents: make(map[uint64]*storage.ExtentInfo),
50
ExtentsToBeCreated: make([]*storage.ExtentInfo, 0),
51
ExtentsToBeRepaired: make([]*storage.ExtentInfo, 0),
52
LeaderTinyDeleteRecordFileSize: tinyDeleteRecordFileSize,
53
LeaderAddr: leaderAddr,
55
for _, extentFile := range extentFiles {
56
extentFile.Source = source
57
task.extents[extentFile.FileID] = extentFile
78
func (dp *DataPartition) repair(extentType uint8) {
79
start := time.Now().UnixNano()
80
log.LogInfof("action[repair] partition(%v) start.", dp.partitionID)
82
var tinyExtents []uint64
83
if proto.IsTinyExtentType(extentType) {
84
tinyExtents = dp.brokenTinyExtents()
85
if len(tinyExtents) == 0 {
91
replica := dp.getReplicaCopy()
92
repairTasks := make([]*DataPartitionRepairTask, len(replica))
93
err := dp.buildDataPartitionRepairTask(repairTasks, extentType, tinyExtents, replica)
95
log.LogErrorf(errors.Stack(err))
96
log.LogErrorf("action[repair] partition(%v) err(%v).",
98
dp.moveToBrokenTinyExtentC(extentType, tinyExtents)
101
log.LogInfof("action[repair] partition(%v) before prepareRepairTasks", dp.partitionID)
103
availableTinyExtents, brokenTinyExtents := dp.prepareRepairTasks(repairTasks)
106
err = dp.NotifyExtentRepair(repairTasks)
108
dp.sendAllTinyExtentsToC(extentType, availableTinyExtents, brokenTinyExtents)
109
log.LogErrorf("action[repair] partition(%v) err(%v).",
111
log.LogError(errors.Stack(err))
114
log.LogDebugf("DoRepair")
116
dp.DoRepair(repairTasks)
117
end := time.Now().UnixNano()
120
dp.sendAllTinyExtentsToC(extentType, availableTinyExtents, brokenTinyExtents)
123
if dp.extentStore.AvailableTinyExtentCnt()+dp.extentStore.BrokenTinyExtentCnt() > storage.TinyExtentCount {
124
log.LogWarnf("action[repair] partition(%v) GoodTinyExtents(%v) "+
125
"BadTinyExtents(%v) finish cost[%vms].", dp.partitionID, dp.extentStore.AvailableTinyExtentCnt(),
126
dp.extentStore.BrokenTinyExtentCnt(), (end-start)/int64(time.Millisecond))
129
log.LogInfof("action[repair] partition(%v) GoodTinyExtents(%v) BadTinyExtents(%v)"+
130
" finish cost[%vms] masterAddr(%v).", dp.partitionID, dp.extentStore.AvailableTinyExtentCnt(),
131
dp.extentStore.BrokenTinyExtentCnt(), (end-start)/int64(time.Millisecond), MasterClient.Nodes())
134
func (dp *DataPartition) buildDataPartitionRepairTask(repairTasks []*DataPartitionRepairTask, extentType uint8, tinyExtents []uint64, replica []string) (err error) {
136
extents, leaderTinyDeleteRecordFileSize, err := dp.getLocalExtentInfo(extentType, tinyExtents)
141
log.LogInfof("buildDataPartitionRepairTask dp %v, extent type %v, len extent %v, replica size %v", dp.partitionID, extentType, len(extents), len(replica))
142
repairTasks[0] = NewDataPartitionRepairTask(extents, leaderTinyDeleteRecordFileSize, replica[0], replica[0])
143
repairTasks[0].addr = replica[0]
146
for index := 1; index < len(replica); index++ {
147
extents, err := dp.getRemoteExtentInfo(extentType, tinyExtents, replica[index])
149
log.LogErrorf("buildDataPartitionRepairTask PartitionID(%v) on (%v) err(%v)", dp.partitionID, replica[index], err)
152
log.LogInfof("buildDataPartitionRepairTask dp %v, add new add %v, extent type %v", dp.partitionID, replica[index], extentType)
153
repairTasks[index] = NewDataPartitionRepairTask(extents, leaderTinyDeleteRecordFileSize, replica[index], replica[0])
154
repairTasks[index].addr = replica[index]
160
func (dp *DataPartition) getLocalExtentInfo(extentType uint8, tinyExtents []uint64) (extents []*storage.ExtentInfo, leaderTinyDeleteRecordFileSize int64, err error) {
161
var localExtents []*storage.ExtentInfo
163
if proto.IsNormalExtentType(extentType) {
164
localExtents, leaderTinyDeleteRecordFileSize, err = dp.extentStore.GetAllWatermarks(storage.NormalExtentFilter())
166
localExtents, leaderTinyDeleteRecordFileSize, err = dp.extentStore.GetAllWatermarks(storage.TinyExtentFilter(tinyExtents))
169
err = errors.Trace(err, "getLocalExtentInfo extent DataPartition(%v) GetAllWaterMark", dp.partitionID)
172
if len(localExtents) <= 0 {
173
extents = make([]*storage.ExtentInfo, 0)
176
extents = make([]*storage.ExtentInfo, 0, len(localExtents))
177
for _, et := range localExtents {
179
extents = append(extents, &newEt)
184
func (dp *DataPartition) getRemoteExtentInfo(extentType uint8, tinyExtents []uint64,
185
target string) (extentFiles []*storage.ExtentInfo, err error) {
186
p := repl.NewPacketToGetAllWatermarks(dp.partitionID, extentType)
187
extentFiles = make([]*storage.ExtentInfo, 0)
188
if proto.IsTinyExtentType(extentType) {
189
p.Data, err = json.Marshal(tinyExtents)
191
err = errors.Trace(err, "getRemoteExtentInfo DataPartition(%v) GetAllWatermarks", dp.partitionID)
194
p.Size = uint32(len(p.Data))
196
var conn *net.TCPConn
197
conn, err = gConnPool.GetConnect(target)
199
err = errors.Trace(err, "getRemoteExtentInfo DataPartition(%v) get host(%v) connect", dp.partitionID, target)
203
gConnPool.PutConnect(conn, err != nil)
205
err = p.WriteToConn(conn)
207
err = errors.Trace(err, "getRemoteExtentInfo DataPartition(%v) write to host(%v)", dp.partitionID, target)
210
reply := new(repl.Packet)
211
err = reply.ReadFromConnWithVer(conn, proto.GetAllWatermarksDeadLineTime)
213
err = errors.Trace(err, "getRemoteExtentInfo DataPartition(%v) read from host(%v)", dp.partitionID, target)
216
err = json.Unmarshal(reply.Data[:reply.Size], &extentFiles)
218
err = errors.Trace(err, "getRemoteExtentInfo DataPartition(%v) unmarshal json(%v) from host(%v)",
219
dp.partitionID, string(reply.Data[:reply.Size]), target)
227
func (dp *DataPartition) DoRepair(repairTasks []*DataPartitionRepairTask) {
228
store := dp.extentStore
229
for _, extentInfo := range repairTasks[0].ExtentsToBeCreated {
230
if !AutoRepairStatus {
231
log.LogWarnf("AutoRepairStatus is False,so cannot Create extent(%v),pid=%d", extentInfo.String(), dp.partitionID)
234
if dp.ExtentStore().IsDeletedNormalExtent(extentInfo.FileID) {
238
dp.disk.allocCheckLimit(proto.IopsWriteType, 1)
240
store.Create(extentInfo.FileID)
242
log.LogDebugf("action[DoRepair] leader to repair len[%v], {%v}", len(repairTasks[0].ExtentsToBeRepaired), repairTasks[0].ExtentsToBeRepaired)
243
for _, extentInfo := range repairTasks[0].ExtentsToBeRepaired {
244
log.LogDebugf("action[DoRepair] leader to repair len[%v], {%v}", len(repairTasks[0].ExtentsToBeRepaired), extentInfo)
245
err := dp.streamRepairExtent(extentInfo)
247
err = errors.Trace(err, "doStreamExtentFixRepair %v", dp.applyRepairKey(int(extentInfo.FileID)))
248
localExtentInfo, opErr := dp.ExtentStore().Watermark(uint64(extentInfo.FileID))
250
err = errors.Trace(err, opErr.Error())
252
err = errors.Trace(err, "partition(%v) remote(%v) local(%v)",
253
dp.partitionID, extentInfo, localExtentInfo)
254
log.LogWarnf("action[doStreamExtentFixRepair] err(%v).", err)
259
func (dp *DataPartition) moveToBrokenTinyExtentC(extentType uint8, extents []uint64) {
260
if proto.IsTinyExtentType(extentType) {
261
dp.extentStore.SendAllToBrokenTinyExtentC(extents)
265
func (dp *DataPartition) sendAllTinyExtentsToC(extentType uint8, availableTinyExtents, brokenTinyExtents []uint64) {
266
if !proto.IsTinyExtentType(extentType) {
269
for _, extentID := range availableTinyExtents {
270
if storage.IsTinyExtent(extentID) {
271
dp.extentStore.SendToAvailableTinyExtentC(extentID)
274
for _, extentID := range brokenTinyExtents {
275
if storage.IsTinyExtent(extentID) {
276
dp.extentStore.SendToBrokenTinyExtentC(extentID)
281
func (dp *DataPartition) brokenTinyExtents() (brokenTinyExtents []uint64) {
282
brokenTinyExtents = make([]uint64, 0)
283
extentsToBeRepaired := MinTinyExtentsToRepair
284
if dp.extentStore.AvailableTinyExtentCnt() <= MinAvaliTinyExtentCnt {
285
extentsToBeRepaired = storage.TinyExtentCount
287
for i := 0; i < extentsToBeRepaired; i++ {
288
extentID, err := dp.extentStore.GetBrokenTinyExtent()
292
brokenTinyExtents = append(brokenTinyExtents, extentID)
297
func (dp *DataPartition) prepareRepairTasks(repairTasks []*DataPartitionRepairTask) (availableTinyExtents []uint64, brokenTinyExtents []uint64) {
298
extentInfoMap := make(map[uint64]*storage.ExtentInfo)
299
deleteExtents := make(map[uint64]bool)
300
log.LogInfof("action[prepareRepairTasks] dp %v task len %v", dp.partitionID, len(repairTasks))
301
for index := 0; index < len(repairTasks); index++ {
302
repairTask := repairTasks[index]
303
if repairTask == nil {
306
for extentID, extentInfo := range repairTask.extents {
307
if extentInfo.IsDeleted {
308
deleteExtents[extentID] = true
311
extentWithMaxSize, ok := extentInfoMap[extentID]
313
extentInfoMap[extentID] = extentInfo
315
if extentInfo.TotalSize() > extentWithMaxSize.TotalSize() {
316
extentInfoMap[extentID] = extentInfo
322
for extentID := range deleteExtents {
323
extentInfo := extentInfoMap[extentID]
324
if extentInfo != nil {
325
extentInfo.IsDeleted = true
326
extentInfoMap[extentID] = extentInfo
329
dp.buildExtentCreationTasks(repairTasks, extentInfoMap)
330
availableTinyExtents, brokenTinyExtents = dp.buildExtentRepairTasks(repairTasks, extentInfoMap)
335
func (dp *DataPartition) buildExtentCreationTasks(repairTasks []*DataPartitionRepairTask, extentInfoMap map[uint64]*storage.ExtentInfo) {
336
for extentID, extentInfo := range extentInfoMap {
337
if storage.IsTinyExtent(extentID) {
340
for index := 0; index < len(repairTasks); index++ {
341
repairTask := repairTasks[index]
342
if repairTask == nil {
345
if _, ok := repairTask.extents[extentID]; !ok && !extentInfo.IsDeleted {
346
if storage.IsTinyExtent(extentID) {
349
if extentInfo.IsDeleted {
352
if dp.ExtentStore().IsDeletedNormalExtent(extentID) {
355
ei := &storage.ExtentInfo{Source: extentInfo.Source, FileID: extentID, Size: extentInfo.Size, SnapshotDataOff: extentInfo.SnapshotDataOff}
356
repairTask.ExtentsToBeCreated = append(repairTask.ExtentsToBeCreated, ei)
357
repairTask.ExtentsToBeRepaired = append(repairTask.ExtentsToBeRepaired, ei)
358
log.LogInfof("action[generatorAddExtentsTasks] addFile(%v_%v) on Index(%v).", dp.partitionID, ei, index)
365
func (dp *DataPartition) buildExtentRepairTasks(repairTasks []*DataPartitionRepairTask, maxSizeExtentMap map[uint64]*storage.ExtentInfo) (availableTinyExtents []uint64, brokenTinyExtents []uint64) {
366
availableTinyExtents = make([]uint64, 0)
367
brokenTinyExtents = make([]uint64, 0)
368
for extentID, maxFileInfo := range maxSizeExtentMap {
370
hasBeenRepaired := true
371
for index := 0; index < len(repairTasks); index++ {
372
if repairTasks[index] == nil {
375
extentInfo, ok := repairTasks[index].extents[extentID]
379
if extentInfo.IsDeleted {
382
if dp.ExtentStore().IsDeletedNormalExtent(extentID) {
385
if extentInfo.TotalSize() < maxFileInfo.TotalSize() {
386
fixExtent := &storage.ExtentInfo{Source: maxFileInfo.Source, FileID: extentID, Size: maxFileInfo.Size, SnapshotDataOff: maxFileInfo.SnapshotDataOff}
387
repairTasks[index].ExtentsToBeRepaired = append(repairTasks[index].ExtentsToBeRepaired, fixExtent)
388
log.LogInfof("action[generatorFixExtentSizeTasks] fixExtent(%v_%v) on Index(%v) on(%v).",
389
dp.partitionID, fixExtent, index, repairTasks[index].addr)
390
hasBeenRepaired = false
394
if storage.IsTinyExtent(extentID) {
396
availableTinyExtents = append(availableTinyExtents, extentID)
398
brokenTinyExtents = append(brokenTinyExtents, extentID)
405
func (dp *DataPartition) notifyFollower(wg *sync.WaitGroup, index int, members []*DataPartitionRepairTask) (err error) {
406
p := repl.NewPacketToNotifyExtentRepair(dp.partitionID)
407
var conn *net.TCPConn
410
target := members[index].addr
412
p.Data, _ = json.Marshal(members[index])
413
p.Size = uint32(len(p.Data))
414
conn, err = gConnPool.GetConnect(target)
418
log.LogInfof(ActionNotifyFollowerToRepair+" to host(%v) Partition(%v) done", target, dp.partitionID)
420
log.LogErrorf(ActionNotifyFollowerToRepair+" to host(%v) Partition(%v) failed, err(%v)", target, dp.partitionID, err)
427
gConnPool.PutConnect(conn, err != nil)
429
if err = p.WriteToConn(conn); err != nil {
432
if err = p.ReadFromConnWithVer(conn, proto.NoReadDeadlineTime); err != nil {
439
func (dp *DataPartition) NotifyExtentRepair(members []*DataPartitionRepairTask) (err error) {
440
wg := new(sync.WaitGroup)
441
for i := 1; i < len(members); i++ {
442
if members[i] == nil || !dp.IsExistReplica(members[i].addr) {
443
if members[i] != nil {
444
log.LogInfof("notify extend repair is change ,index(%v),pid(%v),task_member_add(%v),IsExistReplica(%v)",
445
i, dp.partitionID, members[i].addr, dp.IsExistReplica(members[i].addr))
451
go dp.notifyFollower(wg, i, members)
458
func (dp *DataPartition) doStreamExtentFixRepair(wg *sync.WaitGroup, remoteExtentInfo *storage.ExtentInfo) {
461
err := dp.streamRepairExtent(remoteExtentInfo)
464
if dp.isDecommissionRecovering() {
465
atomic.AddUint64(&dp.recoverErrCnt, 1)
466
if atomic.LoadUint64(&dp.recoverErrCnt) >= dp.dataNode.GetDpMaxRepairErrCnt() {
467
dp.handleDecommissionRecoverFailed()
471
err = errors.Trace(err, "doStreamExtentFixRepair %v", dp.applyRepairKey(int(remoteExtentInfo.FileID)))
472
localExtentInfo, opErr := dp.ExtentStore().Watermark(uint64(remoteExtentInfo.FileID))
474
err = errors.Trace(err, opErr.Error())
476
err = errors.Trace(err, "partition(%v) remote(%v) local(%v)",
477
dp.partitionID, remoteExtentInfo, localExtentInfo)
478
log.LogWarnf("action[doStreamExtentFixRepair] err(%v).", err)
482
func (dp *DataPartition) applyRepairKey(extentID int) (m string) {
483
return fmt.Sprintf("ApplyRepairKey(%v_%v)", dp.partitionID, extentID)
487
func (dp *DataPartition) streamRepairExtent(remoteExtentInfo *storage.ExtentInfo) (err error) {
488
log.LogDebugf("streamRepairExtent dp %v remote info %v", dp.partitionID, remoteExtentInfo)
489
store := dp.ExtentStore()
490
if !store.HasExtent(remoteExtentInfo.FileID) {
491
log.LogDebugf("streamRepairExtent remote info %v not exist", remoteExtentInfo)
494
if !AutoRepairStatus && !storage.IsTinyExtent(remoteExtentInfo.FileID) {
495
log.LogWarnf("AutoRepairStatus is False,so cannot AutoRepair extent(%v)", remoteExtentInfo.String())
498
localExtentInfo, err := store.Watermark(remoteExtentInfo.FileID)
500
log.LogDebugf("streamRepairExtent local %v remote info %v", localExtentInfo, remoteExtentInfo)
501
return errors.Trace(err, "streamRepairExtent Watermark error")
503
log.LogDebugf("streamRepairExtent dp %v remote info %v,local %v", dp.partitionID, remoteExtentInfo, localExtentInfo)
504
if dp.ExtentStore().IsDeletedNormalExtent(remoteExtentInfo.FileID) {
505
log.LogDebugf("streamRepairExtent local %v remote info %v", localExtentInfo, remoteExtentInfo)
509
if localExtentInfo.Size >= remoteExtentInfo.Size && localExtentInfo.SnapshotDataOff >= remoteExtentInfo.SnapshotDataOff {
510
log.LogDebugf("streamRepairExtent local %v remote info %v", localExtentInfo, remoteExtentInfo)
514
doWork := func(wType int, currFixOffset uint64, dstOffset uint64, request *repl.Packet) (err error) {
515
log.LogDebugf("streamRepairExtent. currFixOffset %v dstOffset %v, request %v", currFixOffset, dstOffset, request)
517
conn, err = dp.getRepairConn(remoteExtentInfo.Source)
519
return errors.Trace(err, "streamRepairExtent get conn from host(%v) error", remoteExtentInfo.Source)
522
dp.putRepairConn(conn, err != nil)
525
if err = request.WriteToConn(conn); err != nil {
526
err = errors.Trace(err, "streamRepairExtent send streamRead to host(%v) error", remoteExtentInfo.Source)
527
log.LogWarnf("action[streamRepairExtent] err(%v).", err)
531
var hasRecoverySize uint64
533
for currFixOffset < dstOffset {
534
if currFixOffset >= dstOffset {
537
reply := repl.NewPacket()
540
if err = reply.ReadFromConnWithVer(conn, 60); err != nil {
541
err = errors.Trace(err, "streamRepairExtent receive data error,localExtentSize(%v) remoteExtentSize(%v)", currFixOffset, dstOffset)
545
if reply.ResultCode != proto.OpOk {
546
err = errors.Trace(fmt.Errorf("unknow result code"),
547
"streamRepairExtent receive opcode error(%v) ,localExtentSize(%v) remoteExtentSize(%v)", string(reply.Data[:intMin(len(reply.Data), int(reply.Size))]), currFixOffset, remoteExtentInfo.Size)
551
if reply.ReqID != request.ReqID || reply.PartitionID != request.PartitionID ||
552
reply.ExtentID != request.ExtentID {
553
err = errors.Trace(fmt.Errorf("unavali reply"), "streamRepairExtent receive unavalid "+
554
"request(%v) reply(%v) ,localExtentSize(%v) remoteExtentSize(%v)", request.GetUniqueLogId(), reply.GetUniqueLogId(), currFixOffset, dstOffset)
558
if !storage.IsTinyExtent(reply.ExtentID) && (reply.Size == 0 || reply.ExtentOffset != int64(currFixOffset)) {
559
err = errors.Trace(fmt.Errorf("unavali reply"), "streamRepairExtent receive unavalid "+
560
"request(%v) reply(%v) localExtentSize(%v) remoteExtentSize(%v)", request.GetUniqueLogId(), reply.GetUniqueLogId(), currFixOffset, dstOffset)
563
if loopTimes%100 == 0 {
564
log.LogInfof(fmt.Sprintf("action[streamRepairExtent] fix(%v_%v) start fix from (%v)"+
565
" remoteSize(%v)localSize(%v) reply(%v).", dp.partitionID, localExtentInfo.FileID, remoteExtentInfo.String(),
566
dstOffset, currFixOffset, reply.GetUniqueLogId()))
570
actualCrc := crc32.ChecksumIEEE(reply.Data[:reply.Size])
571
if reply.CRC != actualCrc {
572
err = fmt.Errorf("streamRepairExtent crc mismatch expectCrc(%v) actualCrc(%v) extent(%v_%v) start fix from (%v)"+
573
" remoteSize(%v) localSize(%v) request(%v) reply(%v) ", reply.CRC, actualCrc, dp.partitionID, remoteExtentInfo.String(),
574
remoteExtentInfo.Source, dstOffset, currFixOffset, request.GetUniqueLogId(), reply.GetUniqueLogId())
575
return errors.Trace(err, "streamRepairExtent receive data error")
577
isEmptyResponse := false
579
if storage.IsTinyExtent(uint64(localExtentInfo.FileID)) {
580
currRecoverySize := uint64(reply.Size)
581
var remoteAvaliSize uint64
582
if reply.ArgLen == TinyExtentRepairReadResponseArgLen {
583
remoteAvaliSize = binary.BigEndian.Uint64(reply.Arg[9:TinyExtentRepairReadResponseArgLen])
585
if reply.Arg != nil {
586
isEmptyResponse = reply.Arg[0] == EmptyResponse
589
currRecoverySize = binary.BigEndian.Uint64(reply.Arg[1:9])
590
reply.Size = uint32(currRecoverySize)
592
err = store.TinyExtentRecover(uint64(localExtentInfo.FileID), int64(currFixOffset), int64(currRecoverySize), reply.Data, reply.CRC, isEmptyResponse)
593
if hasRecoverySize+currRecoverySize >= remoteAvaliSize {
594
log.LogInfof("streamRepairTinyExtent(%v) recover fininsh,remoteAvaliSize(%v) "+
595
"hasRecoverySize(%v) currRecoverySize(%v)", dp.applyRepairKey(int(localExtentInfo.FileID)),
596
remoteAvaliSize, hasRecoverySize+currRecoverySize, currRecoverySize)
600
log.LogDebugf("streamRepairExtent reply size %v, currFixoffset %v, reply %v ", reply.Size, currFixOffset, reply)
601
_, err = store.Write(uint64(localExtentInfo.FileID), int64(currFixOffset), int64(reply.Size), reply.Data, reply.CRC, wType, BufferWrite)
606
err = errors.Trace(err, "streamRepairExtent repair data error ")
609
hasRecoverySize += uint64(reply.Size)
610
currFixOffset += uint64(reply.Size)
611
if currFixOffset >= dstOffset {
612
log.LogWarnf(fmt.Sprintf("action[streamRepairExtent] fix(%v_%v) start fix from (%v)"+
613
" remoteSize(%v)localSize(%v) reply(%v).", dp.partitionID, localExtentInfo.FileID, remoteExtentInfo.String(),
614
dstOffset, currFixOffset, reply.GetUniqueLogId()))
622
var request *repl.Packet
623
sizeDiff := remoteExtentInfo.Size - localExtentInfo.Size
625
if storage.IsTinyExtent(remoteExtentInfo.FileID) {
626
if sizeDiff >= math.MaxUint32 {
627
sizeDiff = math.MaxUint32 - util.MB
629
request = repl.NewTinyExtentRepairReadPacket(dp.partitionID, remoteExtentInfo.FileID, int(localExtentInfo.Size), int(sizeDiff))
630
currFixOffset := localExtentInfo.Size
631
return doWork(0, currFixOffset, remoteExtentInfo.Size, request)
634
log.LogDebugf("streamRepairExtent. local info %v, remote %v", localExtentInfo, remoteExtentInfo)
635
request = repl.NewExtentRepairReadPacket(dp.partitionID, remoteExtentInfo.FileID, int(localExtentInfo.Size), int(sizeDiff))
636
currFixOffset := localExtentInfo.Size
637
if err = doWork(storage.AppendWriteType, currFixOffset, remoteExtentInfo.Size, request); err != nil {
641
sizeDiffVerAppend := remoteExtentInfo.SnapshotDataOff - localExtentInfo.SnapshotDataOff
642
if sizeDiffVerAppend > 0 {
643
request = repl.NewExtentRepairReadPacket(dp.partitionID, remoteExtentInfo.FileID, int(localExtentInfo.SnapshotDataOff), int(sizeDiffVerAppend))
644
currFixOffset := localExtentInfo.SnapshotDataOff
645
return doWork(storage.AppendRandomWriteType, currFixOffset, remoteExtentInfo.SnapshotDataOff, request)
652
func intMin(a, b int) int {