cubefs

Форк
0
/
data_partition_repair.go 
658 строк · 25.7 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package datanode
16

17
import (
18
	"encoding/binary"
19
	"encoding/json"
20
	"fmt"
21
	"hash/crc32"
22
	"math"
23
	"net"
24
	"sync"
25
	"sync/atomic"
26
	"time"
27

28
	"github.com/cubefs/cubefs/proto"
29
	"github.com/cubefs/cubefs/repl"
30
	"github.com/cubefs/cubefs/storage"
31
	"github.com/cubefs/cubefs/util"
32
	"github.com/cubefs/cubefs/util/errors"
33
	"github.com/cubefs/cubefs/util/log"
34
)
35

36
// DataPartitionRepairTask defines the repair task for the data partition.
37
type DataPartitionRepairTask struct {
38
	TaskType                       uint8
39
	addr                           string
40
	extents                        map[uint64]*storage.ExtentInfo
41
	ExtentsToBeCreated             []*storage.ExtentInfo
42
	ExtentsToBeRepaired            []*storage.ExtentInfo
43
	LeaderTinyDeleteRecordFileSize int64
44
	LeaderAddr                     string
45
}
46

47
func NewDataPartitionRepairTask(extentFiles []*storage.ExtentInfo, tinyDeleteRecordFileSize int64, source, leaderAddr string) (task *DataPartitionRepairTask) {
48
	task = &DataPartitionRepairTask{
49
		extents:                        make(map[uint64]*storage.ExtentInfo),
50
		ExtentsToBeCreated:             make([]*storage.ExtentInfo, 0),
51
		ExtentsToBeRepaired:            make([]*storage.ExtentInfo, 0),
52
		LeaderTinyDeleteRecordFileSize: tinyDeleteRecordFileSize,
53
		LeaderAddr:                     leaderAddr,
54
	}
55
	for _, extentFile := range extentFiles {
56
		extentFile.Source = source
57
		task.extents[extentFile.FileID] = extentFile
58
	}
59
	return
60
}
61

62
// Main function to perform the repair.
63
// The repair process can be described as follows:
64
// There are two types of repairs.
65
// The first one is called the normal extent repair, and the second one is called the tiny extent repair.
66
//  1. normal extent repair:
67
//     - the leader collects all the extent information from the followers.
68
//     - for each extent, we compare all the replicas to find the one with the largest size.
69
//     - periodically check the size of the local extent, and if it is smaller than the largest size,
70
//     add it to the tobeRepaired list, and generate the corresponding tasks.
71
//  2. tiny extent repair:
72
//     - when creating the new partition, add all tiny extents to the toBeRepaired list,
73
//     and the repair task will create all the tiny extents first.
74
//     - The leader of the replicas periodically collects the extent information of each follower
75
//     - for each extent, we compare all the replicas to find the one with the largest size.
76
//     - periodically check the size of the local extent, and if it is smaller than the largest size,
77
//     add it to the tobeRepaired list, and generate the corresponding tasks.
78
func (dp *DataPartition) repair(extentType uint8) {
79
	start := time.Now().UnixNano()
80
	log.LogInfof("action[repair] partition(%v) start.", dp.partitionID)
81

82
	var tinyExtents []uint64 // unavailable extents
83
	if proto.IsTinyExtentType(extentType) {
84
		tinyExtents = dp.brokenTinyExtents()
85
		if len(tinyExtents) == 0 {
86
			return
87
		}
88
	}
89

90
	// fix dp replica index panic , using replica copy
91
	replica := dp.getReplicaCopy()
92
	repairTasks := make([]*DataPartitionRepairTask, len(replica))
93
	err := dp.buildDataPartitionRepairTask(repairTasks, extentType, tinyExtents, replica)
94
	if err != nil {
95
		log.LogErrorf(errors.Stack(err))
96
		log.LogErrorf("action[repair] partition(%v) err(%v).",
97
			dp.partitionID, err)
98
		dp.moveToBrokenTinyExtentC(extentType, tinyExtents)
99
		return
100
	}
101
	log.LogInfof("action[repair] partition(%v) before prepareRepairTasks", dp.partitionID)
102
	// compare all the extents in the replicas to compute the good and bad ones
103
	availableTinyExtents, brokenTinyExtents := dp.prepareRepairTasks(repairTasks)
104

105
	// notify the replicas to repair the extent
106
	err = dp.NotifyExtentRepair(repairTasks)
107
	if err != nil {
108
		dp.sendAllTinyExtentsToC(extentType, availableTinyExtents, brokenTinyExtents)
109
		log.LogErrorf("action[repair] partition(%v) err(%v).",
110
			dp.partitionID, err)
111
		log.LogError(errors.Stack(err))
112
		return
113
	}
114
	log.LogDebugf("DoRepair")
115
	// ask the leader to do the repair
116
	dp.DoRepair(repairTasks)
117
	end := time.Now().UnixNano()
118

119
	// every time we need to figure out which extents need to be repaired and which ones do not.
120
	dp.sendAllTinyExtentsToC(extentType, availableTinyExtents, brokenTinyExtents)
121

122
	// error check
123
	if dp.extentStore.AvailableTinyExtentCnt()+dp.extentStore.BrokenTinyExtentCnt() > storage.TinyExtentCount {
124
		log.LogWarnf("action[repair] partition(%v) GoodTinyExtents(%v) "+
125
			"BadTinyExtents(%v) finish cost[%vms].", dp.partitionID, dp.extentStore.AvailableTinyExtentCnt(),
126
			dp.extentStore.BrokenTinyExtentCnt(), (end-start)/int64(time.Millisecond))
127
	}
128

129
	log.LogInfof("action[repair] partition(%v) GoodTinyExtents(%v) BadTinyExtents(%v)"+
130
		" finish cost[%vms] masterAddr(%v).", dp.partitionID, dp.extentStore.AvailableTinyExtentCnt(),
131
		dp.extentStore.BrokenTinyExtentCnt(), (end-start)/int64(time.Millisecond), MasterClient.Nodes())
132
}
133

134
func (dp *DataPartition) buildDataPartitionRepairTask(repairTasks []*DataPartitionRepairTask, extentType uint8, tinyExtents []uint64, replica []string) (err error) {
135
	// get the local extent info
136
	extents, leaderTinyDeleteRecordFileSize, err := dp.getLocalExtentInfo(extentType, tinyExtents)
137
	if err != nil {
138
		return err
139
	}
140
	// new repair task for the leader
141
	log.LogInfof("buildDataPartitionRepairTask dp %v, extent type %v, len extent %v, replica size %v", dp.partitionID, extentType, len(extents), len(replica))
142
	repairTasks[0] = NewDataPartitionRepairTask(extents, leaderTinyDeleteRecordFileSize, replica[0], replica[0])
143
	repairTasks[0].addr = replica[0]
144

145
	// new repair tasks for the followers
146
	for index := 1; index < len(replica); index++ {
147
		extents, err := dp.getRemoteExtentInfo(extentType, tinyExtents, replica[index])
148
		if err != nil {
149
			log.LogErrorf("buildDataPartitionRepairTask PartitionID(%v) on (%v) err(%v)", dp.partitionID, replica[index], err)
150
			continue
151
		}
152
		log.LogInfof("buildDataPartitionRepairTask dp %v,  add new add %v,  extent type %v", dp.partitionID, replica[index], extentType)
153
		repairTasks[index] = NewDataPartitionRepairTask(extents, leaderTinyDeleteRecordFileSize, replica[index], replica[0])
154
		repairTasks[index].addr = replica[index]
155
	}
156

157
	return
158
}
159

160
func (dp *DataPartition) getLocalExtentInfo(extentType uint8, tinyExtents []uint64) (extents []*storage.ExtentInfo, leaderTinyDeleteRecordFileSize int64, err error) {
161
	var localExtents []*storage.ExtentInfo
162

163
	if proto.IsNormalExtentType(extentType) {
164
		localExtents, leaderTinyDeleteRecordFileSize, err = dp.extentStore.GetAllWatermarks(storage.NormalExtentFilter())
165
	} else {
166
		localExtents, leaderTinyDeleteRecordFileSize, err = dp.extentStore.GetAllWatermarks(storage.TinyExtentFilter(tinyExtents))
167
	}
168
	if err != nil {
169
		err = errors.Trace(err, "getLocalExtentInfo extent DataPartition(%v) GetAllWaterMark", dp.partitionID)
170
		return
171
	}
172
	if len(localExtents) <= 0 {
173
		extents = make([]*storage.ExtentInfo, 0)
174
		return
175
	}
176
	extents = make([]*storage.ExtentInfo, 0, len(localExtents))
177
	for _, et := range localExtents {
178
		newEt := *et
179
		extents = append(extents, &newEt)
180
	}
181
	return
182
}
183

184
func (dp *DataPartition) getRemoteExtentInfo(extentType uint8, tinyExtents []uint64,
185
	target string) (extentFiles []*storage.ExtentInfo, err error) {
186
	p := repl.NewPacketToGetAllWatermarks(dp.partitionID, extentType)
187
	extentFiles = make([]*storage.ExtentInfo, 0)
188
	if proto.IsTinyExtentType(extentType) {
189
		p.Data, err = json.Marshal(tinyExtents)
190
		if err != nil {
191
			err = errors.Trace(err, "getRemoteExtentInfo DataPartition(%v) GetAllWatermarks", dp.partitionID)
192
			return
193
		}
194
		p.Size = uint32(len(p.Data))
195
	}
196
	var conn *net.TCPConn
197
	conn, err = gConnPool.GetConnect(target) // get remote connection
198
	if err != nil {
199
		err = errors.Trace(err, "getRemoteExtentInfo DataPartition(%v) get host(%v) connect", dp.partitionID, target)
200
		return
201
	}
202
	defer func() {
203
		gConnPool.PutConnect(conn, err != nil)
204
	}()
205
	err = p.WriteToConn(conn) // write command to the remote host
206
	if err != nil {
207
		err = errors.Trace(err, "getRemoteExtentInfo DataPartition(%v) write to host(%v)", dp.partitionID, target)
208
		return
209
	}
210
	reply := new(repl.Packet)
211
	err = reply.ReadFromConnWithVer(conn, proto.GetAllWatermarksDeadLineTime) // read the response
212
	if err != nil {
213
		err = errors.Trace(err, "getRemoteExtentInfo DataPartition(%v) read from host(%v)", dp.partitionID, target)
214
		return
215
	}
216
	err = json.Unmarshal(reply.Data[:reply.Size], &extentFiles)
217
	if err != nil {
218
		err = errors.Trace(err, "getRemoteExtentInfo DataPartition(%v) unmarshal json(%v) from host(%v)",
219
			dp.partitionID, string(reply.Data[:reply.Size]), target)
220
		return
221
	}
222

223
	return
224
}
225

226
// DoRepair asks the leader to perform the repair tasks.
227
func (dp *DataPartition) DoRepair(repairTasks []*DataPartitionRepairTask) {
228
	store := dp.extentStore
229
	for _, extentInfo := range repairTasks[0].ExtentsToBeCreated {
230
		if !AutoRepairStatus {
231
			log.LogWarnf("AutoRepairStatus is False,so cannot Create extent(%v),pid=%d", extentInfo.String(), dp.partitionID)
232
			continue
233
		}
234
		if dp.ExtentStore().IsDeletedNormalExtent(extentInfo.FileID) {
235
			continue
236
		}
237

238
		dp.disk.allocCheckLimit(proto.IopsWriteType, 1)
239

240
		store.Create(extentInfo.FileID)
241
	}
242
	log.LogDebugf("action[DoRepair] leader to repair len[%v], {%v}", len(repairTasks[0].ExtentsToBeRepaired), repairTasks[0].ExtentsToBeRepaired)
243
	for _, extentInfo := range repairTasks[0].ExtentsToBeRepaired {
244
		log.LogDebugf("action[DoRepair] leader to repair len[%v], {%v}", len(repairTasks[0].ExtentsToBeRepaired), extentInfo)
245
		err := dp.streamRepairExtent(extentInfo)
246
		if err != nil {
247
			err = errors.Trace(err, "doStreamExtentFixRepair %v", dp.applyRepairKey(int(extentInfo.FileID)))
248
			localExtentInfo, opErr := dp.ExtentStore().Watermark(uint64(extentInfo.FileID))
249
			if opErr != nil {
250
				err = errors.Trace(err, opErr.Error())
251
			}
252
			err = errors.Trace(err, "partition(%v) remote(%v) local(%v)",
253
				dp.partitionID, extentInfo, localExtentInfo)
254
			log.LogWarnf("action[doStreamExtentFixRepair] err(%v).", err)
255
		}
256
	}
257
}
258

259
func (dp *DataPartition) moveToBrokenTinyExtentC(extentType uint8, extents []uint64) {
260
	if proto.IsTinyExtentType(extentType) {
261
		dp.extentStore.SendAllToBrokenTinyExtentC(extents)
262
	}
263
}
264

265
func (dp *DataPartition) sendAllTinyExtentsToC(extentType uint8, availableTinyExtents, brokenTinyExtents []uint64) {
266
	if !proto.IsTinyExtentType(extentType) {
267
		return
268
	}
269
	for _, extentID := range availableTinyExtents {
270
		if storage.IsTinyExtent(extentID) {
271
			dp.extentStore.SendToAvailableTinyExtentC(extentID)
272
		}
273
	}
274
	for _, extentID := range brokenTinyExtents {
275
		if storage.IsTinyExtent(extentID) {
276
			dp.extentStore.SendToBrokenTinyExtentC(extentID)
277
		}
278
	}
279
}
280

281
func (dp *DataPartition) brokenTinyExtents() (brokenTinyExtents []uint64) {
282
	brokenTinyExtents = make([]uint64, 0)
283
	extentsToBeRepaired := MinTinyExtentsToRepair
284
	if dp.extentStore.AvailableTinyExtentCnt() <= MinAvaliTinyExtentCnt {
285
		extentsToBeRepaired = storage.TinyExtentCount
286
	}
287
	for i := 0; i < extentsToBeRepaired; i++ {
288
		extentID, err := dp.extentStore.GetBrokenTinyExtent()
289
		if err != nil {
290
			return
291
		}
292
		brokenTinyExtents = append(brokenTinyExtents, extentID)
293
	}
294
	return
295
}
296

297
func (dp *DataPartition) prepareRepairTasks(repairTasks []*DataPartitionRepairTask) (availableTinyExtents []uint64, brokenTinyExtents []uint64) {
298
	extentInfoMap := make(map[uint64]*storage.ExtentInfo)
299
	deleteExtents := make(map[uint64]bool)
300
	log.LogInfof("action[prepareRepairTasks] dp %v task len %v", dp.partitionID, len(repairTasks))
301
	for index := 0; index < len(repairTasks); index++ {
302
		repairTask := repairTasks[index]
303
		if repairTask == nil {
304
			continue
305
		}
306
		for extentID, extentInfo := range repairTask.extents {
307
			if extentInfo.IsDeleted {
308
				deleteExtents[extentID] = true
309
				continue
310
			}
311
			extentWithMaxSize, ok := extentInfoMap[extentID]
312
			if !ok {
313
				extentInfoMap[extentID] = extentInfo
314
			} else {
315
				if extentInfo.TotalSize() > extentWithMaxSize.TotalSize() {
316
					extentInfoMap[extentID] = extentInfo
317
				}
318
			}
319
			//			log.LogInfof("action[prepareRepairTasks] dp %v extentid %v addr[dst %v,leader %v] info %v", dp.partitionID, extentID, repairTask.addr, repairTask.LeaderAddr, extentInfoMap[extentID])
320
		}
321
	}
322
	for extentID := range deleteExtents {
323
		extentInfo := extentInfoMap[extentID]
324
		if extentInfo != nil {
325
			extentInfo.IsDeleted = true
326
			extentInfoMap[extentID] = extentInfo
327
		}
328
	}
329
	dp.buildExtentCreationTasks(repairTasks, extentInfoMap)
330
	availableTinyExtents, brokenTinyExtents = dp.buildExtentRepairTasks(repairTasks, extentInfoMap)
331
	return
332
}
333

334
// Create a new extent if one of the replica is missing.
335
func (dp *DataPartition) buildExtentCreationTasks(repairTasks []*DataPartitionRepairTask, extentInfoMap map[uint64]*storage.ExtentInfo) {
336
	for extentID, extentInfo := range extentInfoMap {
337
		if storage.IsTinyExtent(extentID) {
338
			continue
339
		}
340
		for index := 0; index < len(repairTasks); index++ {
341
			repairTask := repairTasks[index]
342
			if repairTask == nil {
343
				continue
344
			}
345
			if _, ok := repairTask.extents[extentID]; !ok && !extentInfo.IsDeleted {
346
				if storage.IsTinyExtent(extentID) {
347
					continue
348
				}
349
				if extentInfo.IsDeleted {
350
					continue
351
				}
352
				if dp.ExtentStore().IsDeletedNormalExtent(extentID) {
353
					continue
354
				}
355
				ei := &storage.ExtentInfo{Source: extentInfo.Source, FileID: extentID, Size: extentInfo.Size, SnapshotDataOff: extentInfo.SnapshotDataOff}
356
				repairTask.ExtentsToBeCreated = append(repairTask.ExtentsToBeCreated, ei)
357
				repairTask.ExtentsToBeRepaired = append(repairTask.ExtentsToBeRepaired, ei)
358
				log.LogInfof("action[generatorAddExtentsTasks] addFile(%v_%v) on Index(%v).", dp.partitionID, ei, index)
359
			}
360
		}
361
	}
362
}
363

364
// Repair an extent if the replicas do not have the same length.
365
func (dp *DataPartition) buildExtentRepairTasks(repairTasks []*DataPartitionRepairTask, maxSizeExtentMap map[uint64]*storage.ExtentInfo) (availableTinyExtents []uint64, brokenTinyExtents []uint64) {
366
	availableTinyExtents = make([]uint64, 0)
367
	brokenTinyExtents = make([]uint64, 0)
368
	for extentID, maxFileInfo := range maxSizeExtentMap {
369

370
		hasBeenRepaired := true
371
		for index := 0; index < len(repairTasks); index++ {
372
			if repairTasks[index] == nil {
373
				continue
374
			}
375
			extentInfo, ok := repairTasks[index].extents[extentID]
376
			if !ok {
377
				continue
378
			}
379
			if extentInfo.IsDeleted {
380
				continue
381
			}
382
			if dp.ExtentStore().IsDeletedNormalExtent(extentID) {
383
				continue
384
			}
385
			if extentInfo.TotalSize() < maxFileInfo.TotalSize() {
386
				fixExtent := &storage.ExtentInfo{Source: maxFileInfo.Source, FileID: extentID, Size: maxFileInfo.Size, SnapshotDataOff: maxFileInfo.SnapshotDataOff}
387
				repairTasks[index].ExtentsToBeRepaired = append(repairTasks[index].ExtentsToBeRepaired, fixExtent)
388
				log.LogInfof("action[generatorFixExtentSizeTasks] fixExtent(%v_%v) on Index(%v) on(%v).",
389
					dp.partitionID, fixExtent, index, repairTasks[index].addr)
390
				hasBeenRepaired = false
391
			}
392

393
		}
394
		if storage.IsTinyExtent(extentID) {
395
			if hasBeenRepaired {
396
				availableTinyExtents = append(availableTinyExtents, extentID)
397
			} else {
398
				brokenTinyExtents = append(brokenTinyExtents, extentID)
399
			}
400
		}
401
	}
402
	return
403
}
404

405
func (dp *DataPartition) notifyFollower(wg *sync.WaitGroup, index int, members []*DataPartitionRepairTask) (err error) {
406
	p := repl.NewPacketToNotifyExtentRepair(dp.partitionID) // notify all the followers to repair
407
	var conn *net.TCPConn
408
	// target := dp.getReplicaAddr(index)
409
	// fix repair case panic,may be dp's replicas is change
410
	target := members[index].addr
411

412
	p.Data, _ = json.Marshal(members[index])
413
	p.Size = uint32(len(p.Data))
414
	conn, err = gConnPool.GetConnect(target)
415
	defer func() {
416
		wg.Done()
417
		if err == nil {
418
			log.LogInfof(ActionNotifyFollowerToRepair+" to host(%v) Partition(%v) done", target, dp.partitionID)
419
		} else {
420
			log.LogErrorf(ActionNotifyFollowerToRepair+" to host(%v) Partition(%v) failed, err(%v)", target, dp.partitionID, err)
421
		}
422
	}()
423
	if err != nil {
424
		return err
425
	}
426
	defer func() {
427
		gConnPool.PutConnect(conn, err != nil)
428
	}()
429
	if err = p.WriteToConn(conn); err != nil {
430
		return err
431
	}
432
	if err = p.ReadFromConnWithVer(conn, proto.NoReadDeadlineTime); err != nil {
433
		return err
434
	}
435
	return err
436
}
437

438
// NotifyExtentRepair notifies the followers to repair.
439
func (dp *DataPartition) NotifyExtentRepair(members []*DataPartitionRepairTask) (err error) {
440
	wg := new(sync.WaitGroup)
441
	for i := 1; i < len(members); i++ {
442
		if members[i] == nil || !dp.IsExistReplica(members[i].addr) {
443
			if members[i] != nil {
444
				log.LogInfof("notify extend repair is change ,index(%v),pid(%v),task_member_add(%v),IsExistReplica(%v)",
445
					i, dp.partitionID, members[i].addr, dp.IsExistReplica(members[i].addr))
446
			}
447
			continue
448
		}
449

450
		wg.Add(1)
451
		go dp.notifyFollower(wg, i, members)
452
	}
453
	wg.Wait()
454
	return
455
}
456

457
// DoStreamExtentFixRepair executes the repair on the followers.
458
func (dp *DataPartition) doStreamExtentFixRepair(wg *sync.WaitGroup, remoteExtentInfo *storage.ExtentInfo) {
459
	defer wg.Done()
460

461
	err := dp.streamRepairExtent(remoteExtentInfo)
462
	if err != nil {
463
		// only decommission repair need to check err cnt
464
		if dp.isDecommissionRecovering() {
465
			atomic.AddUint64(&dp.recoverErrCnt, 1)
466
			if atomic.LoadUint64(&dp.recoverErrCnt) >= dp.dataNode.GetDpMaxRepairErrCnt() {
467
				dp.handleDecommissionRecoverFailed()
468
				return
469
			}
470
		}
471
		err = errors.Trace(err, "doStreamExtentFixRepair %v", dp.applyRepairKey(int(remoteExtentInfo.FileID)))
472
		localExtentInfo, opErr := dp.ExtentStore().Watermark(uint64(remoteExtentInfo.FileID))
473
		if opErr != nil {
474
			err = errors.Trace(err, opErr.Error())
475
		}
476
		err = errors.Trace(err, "partition(%v) remote(%v) local(%v)",
477
			dp.partitionID, remoteExtentInfo, localExtentInfo)
478
		log.LogWarnf("action[doStreamExtentFixRepair] err(%v).", err)
479
	}
480
}
481

482
func (dp *DataPartition) applyRepairKey(extentID int) (m string) {
483
	return fmt.Sprintf("ApplyRepairKey(%v_%v)", dp.partitionID, extentID)
484
}
485

486
// The actual repair of an extent happens here.
487
func (dp *DataPartition) streamRepairExtent(remoteExtentInfo *storage.ExtentInfo) (err error) {
488
	log.LogDebugf("streamRepairExtent dp %v remote info %v", dp.partitionID, remoteExtentInfo)
489
	store := dp.ExtentStore()
490
	if !store.HasExtent(remoteExtentInfo.FileID) {
491
		log.LogDebugf("streamRepairExtent remote info %v not exist", remoteExtentInfo)
492
		return
493
	}
494
	if !AutoRepairStatus && !storage.IsTinyExtent(remoteExtentInfo.FileID) {
495
		log.LogWarnf("AutoRepairStatus is False,so cannot AutoRepair extent(%v)", remoteExtentInfo.String())
496
		return
497
	}
498
	localExtentInfo, err := store.Watermark(remoteExtentInfo.FileID)
499
	if err != nil {
500
		log.LogDebugf("streamRepairExtent local %v remote info %v", localExtentInfo, remoteExtentInfo)
501
		return errors.Trace(err, "streamRepairExtent Watermark error")
502
	}
503
	log.LogDebugf("streamRepairExtent dp %v remote info %v,local %v", dp.partitionID, remoteExtentInfo, localExtentInfo)
504
	if dp.ExtentStore().IsDeletedNormalExtent(remoteExtentInfo.FileID) {
505
		log.LogDebugf("streamRepairExtent local %v remote info %v", localExtentInfo, remoteExtentInfo)
506
		return nil
507
	}
508

509
	if localExtentInfo.Size >= remoteExtentInfo.Size && localExtentInfo.SnapshotDataOff >= remoteExtentInfo.SnapshotDataOff {
510
		log.LogDebugf("streamRepairExtent local %v remote info %v", localExtentInfo, remoteExtentInfo)
511
		return nil
512
	}
513

514
	doWork := func(wType int, currFixOffset uint64, dstOffset uint64, request *repl.Packet) (err error) {
515
		log.LogDebugf("streamRepairExtent. currFixOffset %v dstOffset %v, request %v", currFixOffset, dstOffset, request)
516
		var conn net.Conn
517
		conn, err = dp.getRepairConn(remoteExtentInfo.Source)
518
		if err != nil {
519
			return errors.Trace(err, "streamRepairExtent get conn from host(%v) error", remoteExtentInfo.Source)
520
		}
521
		defer func() {
522
			dp.putRepairConn(conn, err != nil)
523
		}()
524

525
		if err = request.WriteToConn(conn); err != nil {
526
			err = errors.Trace(err, "streamRepairExtent send streamRead to host(%v) error", remoteExtentInfo.Source)
527
			log.LogWarnf("action[streamRepairExtent] err(%v).", err)
528
			return
529
		}
530

531
		var hasRecoverySize uint64
532
		var loopTimes uint64
533
		for currFixOffset < dstOffset {
534
			if currFixOffset >= dstOffset {
535
				break
536
			}
537
			reply := repl.NewPacket()
538

539
			// read 64k streaming repair packet
540
			if err = reply.ReadFromConnWithVer(conn, 60); err != nil {
541
				err = errors.Trace(err, "streamRepairExtent receive data error,localExtentSize(%v) remoteExtentSize(%v)", currFixOffset, dstOffset)
542
				return
543
			}
544

545
			if reply.ResultCode != proto.OpOk {
546
				err = errors.Trace(fmt.Errorf("unknow result code"),
547
					"streamRepairExtent receive opcode error(%v) ,localExtentSize(%v) remoteExtentSize(%v)", string(reply.Data[:intMin(len(reply.Data), int(reply.Size))]), currFixOffset, remoteExtentInfo.Size)
548
				return
549
			}
550

551
			if reply.ReqID != request.ReqID || reply.PartitionID != request.PartitionID ||
552
				reply.ExtentID != request.ExtentID {
553
				err = errors.Trace(fmt.Errorf("unavali reply"), "streamRepairExtent receive unavalid "+
554
					"request(%v) reply(%v) ,localExtentSize(%v) remoteExtentSize(%v)", request.GetUniqueLogId(), reply.GetUniqueLogId(), currFixOffset, dstOffset)
555
				return
556
			}
557

558
			if !storage.IsTinyExtent(reply.ExtentID) && (reply.Size == 0 || reply.ExtentOffset != int64(currFixOffset)) {
559
				err = errors.Trace(fmt.Errorf("unavali reply"), "streamRepairExtent receive unavalid "+
560
					"request(%v) reply(%v) localExtentSize(%v) remoteExtentSize(%v)", request.GetUniqueLogId(), reply.GetUniqueLogId(), currFixOffset, dstOffset)
561
				return
562
			}
563
			if loopTimes%100 == 0 {
564
				log.LogInfof(fmt.Sprintf("action[streamRepairExtent] fix(%v_%v) start fix from (%v)"+
565
					" remoteSize(%v)localSize(%v) reply(%v).", dp.partitionID, localExtentInfo.FileID, remoteExtentInfo.String(),
566
					dstOffset, currFixOffset, reply.GetUniqueLogId()))
567
			}
568
			loopTimes++
569

570
			actualCrc := crc32.ChecksumIEEE(reply.Data[:reply.Size])
571
			if reply.CRC != actualCrc {
572
				err = fmt.Errorf("streamRepairExtent crc mismatch expectCrc(%v) actualCrc(%v) extent(%v_%v) start fix from (%v)"+
573
					" remoteSize(%v) localSize(%v) request(%v) reply(%v) ", reply.CRC, actualCrc, dp.partitionID, remoteExtentInfo.String(),
574
					remoteExtentInfo.Source, dstOffset, currFixOffset, request.GetUniqueLogId(), reply.GetUniqueLogId())
575
				return errors.Trace(err, "streamRepairExtent receive data error")
576
			}
577
			isEmptyResponse := false
578
			// Write it to local extent file
579
			if storage.IsTinyExtent(uint64(localExtentInfo.FileID)) {
580
				currRecoverySize := uint64(reply.Size)
581
				var remoteAvaliSize uint64
582
				if reply.ArgLen == TinyExtentRepairReadResponseArgLen {
583
					remoteAvaliSize = binary.BigEndian.Uint64(reply.Arg[9:TinyExtentRepairReadResponseArgLen])
584
				}
585
				if reply.Arg != nil { // compact v1.2.0 recovery
586
					isEmptyResponse = reply.Arg[0] == EmptyResponse
587
				}
588
				if isEmptyResponse {
589
					currRecoverySize = binary.BigEndian.Uint64(reply.Arg[1:9])
590
					reply.Size = uint32(currRecoverySize)
591
				}
592
				err = store.TinyExtentRecover(uint64(localExtentInfo.FileID), int64(currFixOffset), int64(currRecoverySize), reply.Data, reply.CRC, isEmptyResponse)
593
				if hasRecoverySize+currRecoverySize >= remoteAvaliSize {
594
					log.LogInfof("streamRepairTinyExtent(%v) recover fininsh,remoteAvaliSize(%v) "+
595
						"hasRecoverySize(%v) currRecoverySize(%v)", dp.applyRepairKey(int(localExtentInfo.FileID)),
596
						remoteAvaliSize, hasRecoverySize+currRecoverySize, currRecoverySize)
597
					break
598
				}
599
			} else {
600
				log.LogDebugf("streamRepairExtent reply size %v, currFixoffset %v, reply %v ", reply.Size, currFixOffset, reply)
601
				_, err = store.Write(uint64(localExtentInfo.FileID), int64(currFixOffset), int64(reply.Size), reply.Data, reply.CRC, wType, BufferWrite)
602
			}
603
			// log.LogDebugf("streamRepairExtent reply size %v, currFixoffset %v, reply %v err %v", reply.Size, currFixOffset, reply, err)
604
			// write to the local extent file
605
			if err != nil {
606
				err = errors.Trace(err, "streamRepairExtent repair data error ")
607
				return
608
			}
609
			hasRecoverySize += uint64(reply.Size)
610
			currFixOffset += uint64(reply.Size)
611
			if currFixOffset >= dstOffset {
612
				log.LogWarnf(fmt.Sprintf("action[streamRepairExtent] fix(%v_%v) start fix from (%v)"+
613
					" remoteSize(%v)localSize(%v) reply(%v).", dp.partitionID, localExtentInfo.FileID, remoteExtentInfo.String(),
614
					dstOffset, currFixOffset, reply.GetUniqueLogId()))
615
				break
616
			}
617
		}
618
		return
619
	}
620

621
	// size difference between the local extent and the remote extent
622
	var request *repl.Packet
623
	sizeDiff := remoteExtentInfo.Size - localExtentInfo.Size
624

625
	if storage.IsTinyExtent(remoteExtentInfo.FileID) {
626
		if sizeDiff >= math.MaxUint32 {
627
			sizeDiff = math.MaxUint32 - util.MB
628
		}
629
		request = repl.NewTinyExtentRepairReadPacket(dp.partitionID, remoteExtentInfo.FileID, int(localExtentInfo.Size), int(sizeDiff))
630
		currFixOffset := localExtentInfo.Size
631
		return doWork(0, currFixOffset, remoteExtentInfo.Size, request)
632
	} else {
633
		if sizeDiff > 0 {
634
			log.LogDebugf("streamRepairExtent. local info %v, remote %v", localExtentInfo, remoteExtentInfo)
635
			request = repl.NewExtentRepairReadPacket(dp.partitionID, remoteExtentInfo.FileID, int(localExtentInfo.Size), int(sizeDiff))
636
			currFixOffset := localExtentInfo.Size
637
			if err = doWork(storage.AppendWriteType, currFixOffset, remoteExtentInfo.Size, request); err != nil {
638
				return
639
			}
640
		}
641
		sizeDiffVerAppend := remoteExtentInfo.SnapshotDataOff - localExtentInfo.SnapshotDataOff
642
		if sizeDiffVerAppend > 0 {
643
			request = repl.NewExtentRepairReadPacket(dp.partitionID, remoteExtentInfo.FileID, int(localExtentInfo.SnapshotDataOff), int(sizeDiffVerAppend))
644
			currFixOffset := localExtentInfo.SnapshotDataOff
645
			return doWork(storage.AppendRandomWriteType, currFixOffset, remoteExtentInfo.SnapshotDataOff, request)
646
		}
647
	}
648

649
	return
650
}
651

652
func intMin(a, b int) int {
653
	if a < b {
654
		return a
655
	} else {
656
		return b
657
	}
658
}
659

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.