cubefs

Форк
0
/
stream_get.go 
863 строки · 22.8 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package access
16

17
import (
18
	"context"
19
	"fmt"
20
	"io"
21
	"math/rand"
22
	"sort"
23
	"sync"
24
	"time"
25

26
	"github.com/afex/hystrix-go/hystrix"
27

28
	"github.com/cubefs/cubefs/blobstore/access/controller"
29
	"github.com/cubefs/cubefs/blobstore/api/access"
30
	"github.com/cubefs/cubefs/blobstore/api/blobnode"
31
	"github.com/cubefs/cubefs/blobstore/common/codemode"
32
	"github.com/cubefs/cubefs/blobstore/common/ec"
33
	errcode "github.com/cubefs/cubefs/blobstore/common/errors"
34
	"github.com/cubefs/cubefs/blobstore/common/proto"
35
	"github.com/cubefs/cubefs/blobstore/common/rpc"
36
	"github.com/cubefs/cubefs/blobstore/common/trace"
37
	"github.com/cubefs/cubefs/blobstore/util/errors"
38
	"github.com/cubefs/cubefs/blobstore/util/retry"
39
)
40

41
var (
42
	errNeedReconstructRead = errors.New("need to reconstruct read")
43
	errCanceledReadShard   = errors.New("canceled read shard")
44
	errPunishedDisk        = errors.New("punished disk")
45
)
46

47
type blobGetArgs struct {
48
	Cid      proto.ClusterID
49
	Vid      proto.Vid
50
	Bid      proto.BlobID
51
	CodeMode codemode.CodeMode
52
	BlobSize uint64
53
	Offset   uint64
54
	ReadSize uint64
55

56
	ShardSize     int
57
	ShardOffset   int
58
	ShardReadSize int
59
}
60

61
func (blob *blobGetArgs) ID() string {
62
	return fmt.Sprintf("blob(cid:%d vid:%d bid:%d)", blob.Cid, blob.Vid, blob.Bid)
63
}
64

65
type shardData struct {
66
	index  int
67
	status bool
68
	buffer []byte
69
}
70

71
type sortedVuid struct {
72
	index  int
73
	vuid   proto.Vuid
74
	diskID proto.DiskID
75
	host   string
76
}
77

78
func (vuid *sortedVuid) ID() string {
79
	return fmt.Sprintf("blobnode(vuid:%d disk:%d host:%s) ecidx(%02d)",
80
		vuid.vuid, vuid.diskID, vuid.host, vuid.index)
81
}
82

83
type pipeBuffer struct {
84
	err    error
85
	blob   blobGetArgs
86
	shards [][]byte
87
}
88

89
// Get read file
90
//
91
//	required: location, readSize
92
//	optional: offset(default is 0)
93
//
94
//	first return value is data transfer to copy data after argument checking
95
//
96
//	Read data shards firstly, if blob size is small or read few bytes
97
//	then ec reconstruct-read, try to reconstruct from N+X to N+M
98
//	Just read essential bytes in each shard when reconstruct-read.
99
//
100
//	sorted N+X is, such as we use mode EC6P10L2, X=2 and Read from idc=2
101
//	shards like this
102
//	               data N 6        |    parity M 10     | local L 2
103
//	        d1  d2  d3  d4  d5  d6  p1 .. p5  p6 .. p10  l1  l2
104
//	  idc   1   1   1   2   2   2     1         2        1   2
105
//
106
// sorted  d4  d5  d6  p6 .. p10  d1  d2  d3  p1 .. p5
107
// read-1 [d4                p10]
108
// read-2 [d4                p10  d1]
109
// read-3 [d4                p10  d1  d2]
110
// ...
111
// read-9 [d4                                       p5]
112
// failed
113
func (h *Handler) Get(ctx context.Context, w io.Writer, location access.Location, readSize, offset uint64) (func() error, error) {
114
	span := trace.SpanFromContextSafe(ctx)
115
	span.Debugf("get request cluster:%d size:%d offset:%d", location.ClusterID, readSize, offset)
116

117
	blobs, err := genLocationBlobs(&location, readSize, offset)
118
	if err != nil {
119
		span.Info("illegal argument", err)
120
		return func() error { return nil }, errcode.ErrIllegalArguments
121
	}
122
	if len(blobs) == 0 {
123
		return func() error { return nil }, nil
124
	}
125

126
	clusterID := location.ClusterID
127
	var serviceController controller.ServiceController
128
	if err = retry.Timed(3, 200).On(func() error {
129
		sc, err := h.clusterController.GetServiceController(clusterID)
130
		if err != nil {
131
			return err
132
		}
133
		serviceController = sc
134
		return nil
135
	}); err != nil {
136
		span.Error("get service", errors.Detail(err))
137
		return func() error { return nil }, err
138
	}
139

140
	return func() error {
141
		getTime := new(timeReadWrite)
142
		defer func() {
143
			span.AppendRPCTrackLog([]string{getTime.String()})
144
		}()
145

146
		// try to read data shard only,
147
		//   if blobsize is small: all data is in the first shard, cos shards aligned by MinShardSize.
148
		//   read few bytes: read bytes less than quarter of blobsize, like Range:[0-1].
149
		if len(blobs) == 1 {
150
			blob := blobs[0]
151
			if int(blob.BlobSize) <= blob.ShardSize || blob.ReadSize < blob.BlobSize/4 {
152
				span.Debugf("read data shard only %s readsize:%d blobsize:%d shardsize:%d",
153
					blob.ID(), blob.ReadSize, blob.BlobSize, blob.ShardSize)
154

155
				err := h.getDataShardOnly(ctx, getTime, w, serviceController, blob)
156
				if err != errNeedReconstructRead {
157
					if err != nil {
158
						span.Error("read data shard only", err)
159
						reportDownload(clusterID, "Direct", "error")
160
					} else {
161
						reportDownload(clusterID, "Direct", "-")
162
					}
163
					return err
164
				}
165
				span.Info("read data shard only failed", err)
166
			}
167
		}
168

169
		// data stream flow:
170
		// client <--copy-- pipeline <--swap-- readBlob <--copy-- blobnode
171
		//
172
		// Alloc N+M shard buffers here, and release after written to client.
173
		// Replace not-empty buffers in readBlob, need release old-buffers in that function.
174
		closeCh := make(chan struct{})
175
		pipeline := func() <-chan pipeBuffer {
176
			ch := make(chan pipeBuffer, 1)
177
			go func() {
178
				defer close(ch)
179

180
				var blobVolume *controller.VolumePhy
181
				var sortedVuids []sortedVuid
182
				tactic := location.CodeMode.Tactic()
183
				for _, blob := range blobs {
184
					var err error
185
					if blobVolume == nil || blobVolume.Vid != blob.Vid {
186
						blobVolume, err = h.getVolume(ctx, clusterID, blob.Vid, true)
187
						if err != nil {
188
							span.Error("get volume", err)
189
							ch <- pipeBuffer{err: err}
190
							return
191
						}
192

193
						// do not use local shards
194
						sortedVuids = genSortedVuidByIDC(ctx, serviceController, h.IDC, blobVolume.Units[:tactic.N+tactic.M])
195
						span.Debugf("to read %s with read-shard-x:%d active-shard-n:%d of data-n:%d party-n:%d",
196
							blob.ID(), h.MinReadShardsX, len(sortedVuids), tactic.N, tactic.M)
197
						if len(sortedVuids) < tactic.N {
198
							err = fmt.Errorf("broken %s", blob.ID())
199
							span.Error(err)
200
							ch <- pipeBuffer{err: err}
201
							return
202
						}
203
					}
204

205
					st := time.Now()
206
					shards := make([][]byte, tactic.N+tactic.M)
207
					for ii := range shards {
208
						buf, _ := h.memPool.Alloc(blob.ShardSize)
209
						shards[ii] = buf
210
					}
211
					getTime.IncA(time.Since(st))
212

213
					err = h.readOneBlob(ctx, getTime, serviceController, blob, sortedVuids, shards)
214
					if err != nil {
215
						span.Error("read one blob", blob.ID(), err)
216
						for _, buf := range shards {
217
							h.memPool.Put(buf)
218
						}
219
						ch <- pipeBuffer{err: err}
220
						return
221
					}
222

223
					select {
224
					case <-closeCh:
225
						for _, buf := range shards {
226
							h.memPool.Put(buf)
227
						}
228
						return
229
					case ch <- pipeBuffer{blob: blob, shards: shards}:
230
					}
231
				}
232
			}()
233

234
			return ch
235
		}()
236

237
		var err error
238
		for line := range pipeline {
239
			if line.err != nil {
240
				err = line.err
241
				break
242
			}
243

244
			startWrite := time.Now()
245

246
			idx := 0
247
			off := line.blob.Offset
248
			toReadSize := line.blob.ReadSize
249
			for toReadSize > 0 {
250
				buf := line.shards[idx]
251
				l := uint64(len(buf))
252
				if off >= l {
253
					idx++
254
					off -= l
255
					continue
256
				}
257

258
				toRead := minU64(toReadSize, l-off)
259
				if _, e := w.Write(buf[off : off+toRead]); e != nil {
260
					err = errors.Info(e, "write to response")
261
					break
262
				}
263
				idx++
264
				off = 0
265
				toReadSize -= toRead
266
			}
267

268
			getTime.IncW(time.Since(startWrite))
269

270
			for _, buf := range line.shards {
271
				h.memPool.Put(buf)
272
			}
273
			if err != nil {
274
				close(closeCh)
275
				break
276
			}
277
		}
278

279
		// release buffer in pipeline if fail to write client
280
		go func() {
281
			for line := range pipeline {
282
				for _, buf := range line.shards {
283
					h.memPool.Put(buf)
284
				}
285
			}
286
		}()
287

288
		if err != nil {
289
			reportDownload(clusterID, "EC", "error")
290
			span.Error("get request error", err)
291
			return err
292
		}
293
		reportDownload(clusterID, "EC", "-")
294
		return nil
295
	}, nil
296
}
297

298
// 1. try to min-read shards bytes
299
// 2. if failed try to read next shard to reconstruct
300
// 3. write the the right offset bytes to writer
301
// 4. Just read essential bytes if the data is a segment of one shard.
302
func (h *Handler) readOneBlob(ctx context.Context, getTime *timeReadWrite,
303
	serviceController controller.ServiceController,
304
	blob blobGetArgs, sortedVuids []sortedVuid, shards [][]byte) error {
305
	span := trace.SpanFromContextSafe(ctx)
306

307
	tactic := blob.CodeMode.Tactic()
308
	sizes, err := ec.GetBufferSizes(int(blob.BlobSize), tactic)
309
	if err != nil {
310
		return err
311
	}
312
	empties := emptyDataShardIndexes(sizes)
313

314
	dataN, dataParityN := tactic.N, tactic.N+tactic.M
315
	minShardsRead := dataN + h.MinReadShardsX
316
	if minShardsRead > len(sortedVuids) {
317
		minShardsRead = len(sortedVuids)
318
	}
319
	shardSize, shardOffset, shardReadSize := blob.ShardSize, blob.ShardOffset, blob.ShardReadSize
320

321
	stopChan := make(chan struct{})
322
	nextChan := make(chan struct{}, len(sortedVuids))
323
	shardPipe := func() <-chan shardData {
324
		ch := make(chan shardData)
325
		go func() {
326
			wg := new(sync.WaitGroup)
327
			defer func() {
328
				wg.Wait()
329
				close(ch)
330
			}()
331

332
			for _, vuid := range sortedVuids[:minShardsRead] {
333
				if _, ok := empties[vuid.index]; !ok {
334
					wg.Add(1)
335
					go func(vuid sortedVuid) {
336
						ch <- h.readOneShard(ctx, serviceController, blob, vuid, stopChan)
337
						wg.Done()
338
					}(vuid)
339
				}
340
			}
341

342
			for _, vuid := range sortedVuids[minShardsRead:] {
343
				if _, ok := empties[vuid.index]; ok {
344
					continue
345
				}
346

347
				select {
348
				case <-stopChan:
349
					return
350
				case <-nextChan:
351
				}
352

353
				wg.Add(1)
354
				go func(vuid sortedVuid) {
355
					ch <- h.readOneShard(ctx, serviceController, blob, vuid, stopChan)
356
					wg.Done()
357
				}(vuid)
358
			}
359
		}()
360

361
		return ch
362
	}()
363

364
	received := make(map[int]bool, minShardsRead)
365
	for idx := range empties {
366
		received[idx] = true
367
		h.memPool.Zero(shards[idx])
368
	}
369

370
	startRead := time.Now()
371
	reconstructed := false
372
	for shard := range shardPipe {
373
		// swap shard buffer
374
		if shard.status {
375
			buf := shards[shard.index]
376
			shards[shard.index] = shard.buffer
377
			h.memPool.Put(buf)
378
		}
379

380
		received[shard.index] = shard.status
381
		if len(received) < dataN {
382
			continue
383
		}
384

385
		// bad data index
386
		badIdx := make([]int, 0, 8)
387
		for i := 0; i < dataN; i++ {
388
			if succ, ok := received[i]; !ok || !succ {
389
				badIdx = append(badIdx, i)
390
			}
391
		}
392
		if len(badIdx) == 0 {
393
			reconstructed = true
394
			close(stopChan)
395
			break
396
		}
397

398
		// update bad parity index
399
		for i := dataN; i < dataParityN; i++ {
400
			if succ, ok := received[i]; !ok || !succ {
401
				badIdx = append(badIdx, i)
402
			}
403
		}
404

405
		badShards := 0
406
		for _, succ := range received {
407
			if !succ {
408
				badShards++
409
			}
410
		}
411
		// it will not wait all the shards, cos has no enough shards to reconstruct
412
		if badShards > dataParityN-dataN {
413
			span.Infof("%s bad(%d) has no enough to reconstruct", blob.ID(), badShards)
414
			close(stopChan)
415
			break
416
		}
417

418
		// has bad shards, but have enough shards to reconstruct
419
		if len(received) >= dataN+badShards {
420
			var err error
421
			if shardReadSize < shardSize {
422
				span.Debugf("bid(%d) ready to segment ec reconstruct data", blob.Bid)
423
				reportDownload(blob.Cid, "EC", "segment")
424
				segments := make([][]byte, len(shards))
425
				for idx := range shards {
426
					segments[idx] = shards[idx][shardOffset : shardOffset+shardReadSize]
427
				}
428
				err = h.encoder[blob.CodeMode].ReconstructData(segments, badIdx)
429
			} else {
430
				span.Debugf("bid(%d) ready to ec reconstruct data", blob.Bid)
431
				err = h.encoder[blob.CodeMode].ReconstructData(shards, badIdx)
432
			}
433
			if err == nil {
434
				reconstructed = true
435
				close(stopChan)
436
				break
437
			}
438
			span.Errorf("%s ec reconstruct data error:%s", blob.ID(), err.Error())
439
		}
440

441
		if len(received) >= len(sortedVuids) {
442
			close(stopChan)
443
			break
444
		}
445
		nextChan <- struct{}{}
446
	}
447
	getTime.IncR(time.Since(startRead))
448

449
	// release buffer of delayed shards
450
	go func() {
451
		for shard := range shardPipe {
452
			if shard.status {
453
				h.memPool.Put(shard.buffer)
454
			}
455
		}
456
	}()
457

458
	if reconstructed {
459
		return nil
460
	}
461
	return fmt.Errorf("broken %s", blob.ID())
462
}
463

464
func (h *Handler) readOneShard(ctx context.Context, serviceController controller.ServiceController,
465
	blob blobGetArgs, vuid sortedVuid, stopChan <-chan struct{}) shardData {
466
	clusterID, vid := blob.Cid, blob.Vid
467
	shardOffset, shardReadSize := blob.ShardOffset, blob.ShardReadSize
468
	span := trace.SpanFromContextSafe(ctx)
469
	shardResult := shardData{
470
		index:  vuid.index,
471
		status: false,
472
	}
473

474
	args := blobnode.RangeGetShardArgs{
475
		GetShardArgs: blobnode.GetShardArgs{
476
			DiskID: vuid.diskID,
477
			Vuid:   vuid.vuid,
478
			Bid:    blob.Bid,
479
		},
480
		Offset: int64(shardOffset),
481
		Size:   int64(shardReadSize),
482
	}
483

484
	var (
485
		err  error
486
		body io.ReadCloser
487
	)
488
	if hErr := hystrix.Do(rwCommand, func() error {
489
		body, err = h.getOneShardFromHost(ctx, serviceController, vuid.host, vuid.diskID, args,
490
			vuid.index, clusterID, vid, 3, stopChan)
491
		if err != nil && (errorTimeout(err) || rpc.DetectStatusCode(err) == errcode.CodeOverload) {
492
			return err
493
		}
494
		return nil
495
	}, nil); hErr != nil {
496
		span.Warnf("hystrix: read %s on %s: %s", blob.ID(), vuid.ID(), hErr.Error())
497
		return shardResult
498
	}
499

500
	if err != nil {
501
		if err == errPunishedDisk || err == errCanceledReadShard {
502
			span.Warnf("read %s on %s: %s", blob.ID(), vuid.ID(), err.Error())
503
			return shardResult
504
		}
505
		span.Warnf("rpc read %s on %s: %s", blob.ID(), vuid.ID(), errors.Detail(err))
506
		return shardResult
507
	}
508
	defer body.Close()
509

510
	buf, err := h.memPool.Alloc(blob.ShardSize)
511
	if err != nil {
512
		span.Warn(err)
513
		return shardResult
514
	}
515

516
	_, err = io.ReadFull(body, buf[shardOffset:shardOffset+shardReadSize])
517
	if err != nil {
518
		h.memPool.Put(buf)
519
		span.Warnf("io read %s on %s: %s", blob.ID(), vuid.ID(), err.Error())
520
		return shardResult
521
	}
522

523
	shardResult.status = true
524
	shardResult.buffer = buf
525
	return shardResult
526
}
527

528
func (h *Handler) getDataShardOnly(ctx context.Context, getTime *timeReadWrite,
529
	w io.Writer, serviceController controller.ServiceController, blob blobGetArgs) error {
530
	span := trace.SpanFromContextSafe(ctx)
531
	if blob.ReadSize == 0 {
532
		return nil
533
	}
534

535
	blobVolume, err := h.getVolume(ctx, blob.Cid, blob.Vid, true)
536
	if err != nil {
537
		return err
538
	}
539
	tactic := blobVolume.CodeMode.Tactic()
540

541
	from, to := int(blob.Offset), int(blob.Offset+blob.ReadSize)
542
	buffer, err := ec.NewRangeBuffer(int(blob.BlobSize), from, to, tactic, h.memPool)
543
	if err != nil {
544
		return err
545
	}
546
	defer buffer.Release()
547

548
	shardSize := buffer.ShardSize
549
	firstShardIdx := int(blob.Offset) / shardSize
550
	shardOffset := int(blob.Offset) % shardSize
551

552
	startRead := time.Now()
553
	remainSize := blob.ReadSize
554
	bufOffset := 0
555
	for i, shard := range blobVolume.Units[firstShardIdx:tactic.N] {
556
		if remainSize <= 0 {
557
			break
558
		}
559

560
		toReadSize := minU64(remainSize, uint64(shardSize-shardOffset))
561
		args := blobnode.RangeGetShardArgs{
562
			GetShardArgs: blobnode.GetShardArgs{
563
				DiskID: shard.DiskID,
564
				Vuid:   shard.Vuid,
565
				Bid:    blob.Bid,
566
			},
567
			Offset: int64(shardOffset),
568
			Size:   int64(toReadSize),
569
		}
570

571
		body, err := h.getOneShardFromHost(ctx, serviceController, shard.Host, shard.DiskID, args,
572
			firstShardIdx+i, blob.Cid, blob.Vid, 1, nil)
573
		if err != nil {
574
			span.Warnf("read %s on blobnode(vuid:%d disk:%d host:%s) ecidx(%02d): %s", blob.ID(),
575
				shard.Vuid, shard.DiskID, shard.Host, firstShardIdx+i, errors.Detail(err))
576
			return errNeedReconstructRead
577
		}
578
		defer body.Close()
579

580
		buf := buffer.DataBuf[bufOffset : bufOffset+int(toReadSize)]
581
		_, err = io.ReadFull(body, buf)
582
		if err != nil {
583
			span.Warn(err)
584
			return errNeedReconstructRead
585
		}
586

587
		// reset next shard offset
588
		shardOffset = 0
589
		remainSize -= toReadSize
590
		bufOffset += int(toReadSize)
591
	}
592
	getTime.IncR(time.Since(startRead))
593

594
	if remainSize > 0 {
595
		return fmt.Errorf("no enough data to read %d", remainSize)
596
	}
597

598
	startWrite := time.Now()
599
	if _, err := w.Write(buffer.DataBuf[:int(blob.ReadSize)]); err != nil {
600
		getTime.IncW(time.Since(startWrite))
601
		return errors.Info(err, "write to response")
602
	}
603
	getTime.IncW(time.Since(startWrite))
604

605
	return nil
606
}
607

608
// getOneShardFromHost get body of one shard
609
func (h *Handler) getOneShardFromHost(ctx context.Context, serviceController controller.ServiceController,
610
	host string, diskID proto.DiskID, args blobnode.RangeGetShardArgs, // get shard param with host diskid
611
	index int, clusterID proto.ClusterID, vid proto.Vid, // param to update volume cache
612
	attempts int, cancelChan <-chan struct{}, // do not retry again if cancelChan was closed
613
) (io.ReadCloser, error) {
614
	span := trace.SpanFromContextSafe(ctx)
615

616
	// skip punished disk
617
	if diskHost, err := serviceController.GetDiskHost(ctx, diskID); err != nil {
618
		return nil, err
619
	} else if diskHost.Punished {
620
		return nil, errPunishedDisk
621
	}
622

623
	var (
624
		rbody io.ReadCloser
625
		rerr  error
626
	)
627
	rerr = retry.ExponentialBackoff(attempts, 200).RuptOn(func() (bool, error) {
628
		if cancelChan != nil {
629
			select {
630
			case <-cancelChan:
631
				return true, errCanceledReadShard
632
			default:
633
			}
634
		}
635

636
		// new child span to get from blobnode, we should finish it here.
637
		spanChild, ctxChild := trace.StartSpanFromContextWithTraceID(
638
			context.Background(), "GetFromBlobnode", span.TraceID())
639
		defer spanChild.Finish()
640

641
		body, _, err := h.blobnodeClient.RangeGetShard(ctxChild, host, &args)
642
		if err == nil {
643
			rbody = body
644
			return true, nil
645
		}
646

647
		code := rpc.DetectStatusCode(err)
648
		switch code {
649
		case errcode.CodeOverload:
650
			return true, err
651

652
		// EIO and Readonly error, then we need to punish disk in local and no need to retry
653
		case errcode.CodeDiskBroken, errcode.CodeVUIDReadonly:
654
			h.punishDisk(ctx, clusterID, diskID, host, "BrokenOrRO")
655
			span.Warnf("punish disk:%d on:%s cos:blobnode/%d", diskID, host, code)
656
			return true, fmt.Errorf("punished disk (%d %s)", diskID, host)
657

658
		// vuid not found means the reflection between vuid and diskID has change,
659
		// should refresh the blob volume cache
660
		case errcode.CodeDiskNotFound, errcode.CodeVuidNotFound:
661
			span.Infof("volume info outdated disk %d on host %s", diskID, host)
662

663
			latestVolume, e := h.getVolume(ctx, clusterID, vid, false)
664
			if e != nil {
665
				span.Warnf("update volume info with no cache %d %d err: %s", clusterID, vid, e)
666
				return false, err
667
			}
668
			newUnit := latestVolume.Units[index]
669

670
			newDiskID := newUnit.DiskID
671
			if newDiskID != diskID {
672
				hi, e := serviceController.GetDiskHost(ctx, newDiskID)
673
				if e == nil && !hi.Punished {
674
					span.Infof("update disk %d %d %d -> %d", clusterID, vid, diskID, newDiskID)
675

676
					host = hi.Host
677
					diskID = newDiskID
678
					args.GetShardArgs.DiskID = diskID
679
					args.GetShardArgs.Vuid = newUnit.Vuid
680
					return false, err
681
				}
682
			}
683

684
			h.punishDiskWith(ctx, clusterID, diskID, host, "NotFound")
685
			span.Warnf("punish threshold disk:%d cos:blobnode/%d", diskID, code)
686
		default:
687
		}
688

689
		// do not retry on timeout then punish threshold this disk
690
		if errorTimeout(err) {
691
			h.punishDiskWith(ctx, clusterID, diskID, host, "Timeout")
692
			return true, err
693
		}
694
		if errorConnectionRefused(err) {
695
			return true, err
696
		}
697
		span.Debugf("read from disk:%d blobnode/%s", diskID, err.Error())
698

699
		err = errors.Base(err, fmt.Sprintf("get shard on (disk:%d host:%s)", diskID, host))
700
		return false, err
701
	})
702

703
	return rbody, rerr
704
}
705

706
func genLocationBlobs(location *access.Location, readSize uint64, offset uint64) ([]blobGetArgs, error) {
707
	if readSize > location.Size || offset > location.Size || offset+readSize > location.Size {
708
		return nil, fmt.Errorf("FileSize:%d ReadSize:%d Offset:%d", location.Size, readSize, offset)
709
	}
710

711
	blobSize := uint64(location.BlobSize)
712
	if blobSize <= 0 {
713
		return nil, fmt.Errorf("BlobSize:%d", blobSize)
714
	}
715

716
	remainSize := readSize
717
	firstBlobIdx := offset / blobSize
718
	blobOffset := offset % blobSize
719

720
	tactic := location.CodeMode.Tactic()
721

722
	idx := uint64(0)
723
	blobs := make([]blobGetArgs, 0, 1+(readSize+blobOffset)/blobSize)
724
	for _, blob := range location.Blobs {
725
		currBlobID := blob.MinBid
726

727
		for ii := uint32(0); ii < blob.Count; ii++ {
728
			if remainSize <= 0 {
729
				return blobs, nil
730
			}
731

732
			if idx >= firstBlobIdx {
733
				toReadSize := minU64(remainSize, blobSize-blobOffset)
734
				if toReadSize > 0 {
735
					// update the last blob size
736
					fixedBlobSize := minU64(location.Size-idx*blobSize, blobSize)
737

738
					sizes, _ := ec.GetBufferSizes(int(fixedBlobSize), tactic)
739
					shardSize := sizes.ShardSize
740
					shardOffset, shardReadSize := shardSegment(shardSize, int(blobOffset), int(toReadSize))
741

742
					blobs = append(blobs, blobGetArgs{
743
						Cid:      location.ClusterID,
744
						Vid:      blob.Vid,
745
						Bid:      currBlobID,
746
						CodeMode: location.CodeMode,
747
						BlobSize: fixedBlobSize,
748
						Offset:   blobOffset,
749
						ReadSize: toReadSize,
750

751
						ShardSize:     shardSize,
752
						ShardOffset:   shardOffset,
753
						ShardReadSize: shardReadSize,
754
					})
755
				}
756

757
				// reset next blob offset
758
				blobOffset = 0
759
				remainSize -= toReadSize
760
			}
761

762
			currBlobID++
763
			idx++
764
		}
765
	}
766

767
	if remainSize > 0 {
768
		return nil, fmt.Errorf("no enough data to read %d", remainSize)
769
	}
770

771
	return blobs, nil
772
}
773

774
func genSortedVuidByIDC(ctx context.Context, serviceController controller.ServiceController, idc string,
775
	vuidPhys []controller.Unit) []sortedVuid {
776
	span := trace.SpanFromContextSafe(ctx)
777

778
	vuids := make([]sortedVuid, 0, len(vuidPhys))
779
	sortMap := make(map[int][]sortedVuid)
780

781
	for idx, phy := range vuidPhys {
782
		var hostIDC *controller.HostIDC
783
		if err := retry.ExponentialBackoff(2, 100).On(func() error {
784
			hi, e := serviceController.GetDiskHost(context.Background(), phy.DiskID)
785
			if e != nil {
786
				return e
787
			}
788
			hostIDC = hi
789
			return nil
790
		}); err != nil {
791
			span.Warnf("no host of disk(%d %d) %s", phy.Vuid, phy.DiskID, err.Error())
792
			continue
793
		}
794

795
		dis := distance(idc, hostIDC.IDC, hostIDC.Punished)
796
		if _, ok := sortMap[dis]; !ok {
797
			sortMap[dis] = make([]sortedVuid, 0, 8)
798
		}
799
		sortMap[dis] = append(sortMap[dis], sortedVuid{
800
			index:  idx,
801
			vuid:   phy.Vuid,
802
			diskID: phy.DiskID,
803
			host:   phy.Host,
804
		})
805
	}
806

807
	keys := make([]int, 0, len(sortMap))
808
	for dis := range sortMap {
809
		keys = append(keys, dis)
810
	}
811
	sort.Ints(keys)
812

813
	for _, dis := range keys {
814
		ids := sortMap[dis]
815
		rand.Shuffle(len(ids), func(i, j int) {
816
			ids[i], ids[j] = ids[j], ids[i]
817
		})
818
		vuids = append(vuids, ids...)
819
		if dis > 1 {
820
			span.Debugf("distance: %d punished vuids: %+v", dis, ids)
821
		}
822
	}
823

824
	return vuids
825
}
826

827
func distance(idc1, idc2 string, punished bool) int {
828
	if punished {
829
		if idc1 == idc2 {
830
			return 2
831
		}
832
		return 3
833
	}
834
	if idc1 == idc2 {
835
		return 0
836
	}
837
	return 1
838
}
839

840
func emptyDataShardIndexes(sizes ec.BufferSizes) map[int]struct{} {
841
	firstEmptyIdx := (sizes.DataSize + sizes.ShardSize - 1) / sizes.ShardSize
842
	n := sizes.ECDataSize / sizes.ShardSize
843
	if firstEmptyIdx >= n {
844
		return make(map[int]struct{})
845
	}
846

847
	set := make(map[int]struct{}, n-firstEmptyIdx)
848
	for i := firstEmptyIdx; i < n; i++ {
849
		set[i] = struct{}{}
850
	}
851

852
	return set
853
}
854

855
func shardSegment(shardSize, blobOffset, blobReadSize int) (shardOffset, shardReadSize int) {
856
	shardOffset = blobOffset % shardSize
857
	if lastOffset := shardOffset + blobReadSize; lastOffset > shardSize {
858
		shardOffset, shardReadSize = 0, shardSize
859
	} else {
860
		shardReadSize = blobReadSize
861
	}
862
	return
863
}
864

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.