1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
26
"github.com/afex/hystrix-go/hystrix"
28
"github.com/cubefs/cubefs/blobstore/access/controller"
29
"github.com/cubefs/cubefs/blobstore/api/access"
30
"github.com/cubefs/cubefs/blobstore/api/blobnode"
31
"github.com/cubefs/cubefs/blobstore/common/codemode"
32
"github.com/cubefs/cubefs/blobstore/common/ec"
33
errcode "github.com/cubefs/cubefs/blobstore/common/errors"
34
"github.com/cubefs/cubefs/blobstore/common/proto"
35
"github.com/cubefs/cubefs/blobstore/common/rpc"
36
"github.com/cubefs/cubefs/blobstore/common/trace"
37
"github.com/cubefs/cubefs/blobstore/util/errors"
38
"github.com/cubefs/cubefs/blobstore/util/retry"
42
errNeedReconstructRead = errors.New("need to reconstruct read")
43
errCanceledReadShard = errors.New("canceled read shard")
44
errPunishedDisk = errors.New("punished disk")
47
type blobGetArgs struct {
51
CodeMode codemode.CodeMode
61
func (blob *blobGetArgs) ID() string {
62
return fmt.Sprintf("blob(cid:%d vid:%d bid:%d)", blob.Cid, blob.Vid, blob.Bid)
65
type shardData struct {
71
type sortedVuid struct {
78
func (vuid *sortedVuid) ID() string {
79
return fmt.Sprintf("blobnode(vuid:%d disk:%d host:%s) ecidx(%02d)",
80
vuid.vuid, vuid.diskID, vuid.host, vuid.index)
83
type pipeBuffer struct {
91
// required: location, readSize
92
// optional: offset(default is 0)
94
// first return value is data transfer to copy data after argument checking
96
// Read data shards firstly, if blob size is small or read few bytes
97
// then ec reconstruct-read, try to reconstruct from N+X to N+M
98
// Just read essential bytes in each shard when reconstruct-read.
100
// sorted N+X is, such as we use mode EC6P10L2, X=2 and Read from idc=2
102
// data N 6 | parity M 10 | local L 2
103
// d1 d2 d3 d4 d5 d6 p1 .. p5 p6 .. p10 l1 l2
104
// idc 1 1 1 2 2 2 1 2 1 2
106
// sorted d4 d5 d6 p6 .. p10 d1 d2 d3 p1 .. p5
109
// read-3 [d4 p10 d1 d2]
113
func (h *Handler) Get(ctx context.Context, w io.Writer, location access.Location, readSize, offset uint64) (func() error, error) {
114
span := trace.SpanFromContextSafe(ctx)
115
span.Debugf("get request cluster:%d size:%d offset:%d", location.ClusterID, readSize, offset)
117
blobs, err := genLocationBlobs(&location, readSize, offset)
119
span.Info("illegal argument", err)
120
return func() error { return nil }, errcode.ErrIllegalArguments
123
return func() error { return nil }, nil
126
clusterID := location.ClusterID
127
var serviceController controller.ServiceController
128
if err = retry.Timed(3, 200).On(func() error {
129
sc, err := h.clusterController.GetServiceController(clusterID)
133
serviceController = sc
136
span.Error("get service", errors.Detail(err))
137
return func() error { return nil }, err
140
return func() error {
141
getTime := new(timeReadWrite)
143
span.AppendRPCTrackLog([]string{getTime.String()})
146
// try to read data shard only,
147
// if blobsize is small: all data is in the first shard, cos shards aligned by MinShardSize.
148
// read few bytes: read bytes less than quarter of blobsize, like Range:[0-1].
151
if int(blob.BlobSize) <= blob.ShardSize || blob.ReadSize < blob.BlobSize/4 {
152
span.Debugf("read data shard only %s readsize:%d blobsize:%d shardsize:%d",
153
blob.ID(), blob.ReadSize, blob.BlobSize, blob.ShardSize)
155
err := h.getDataShardOnly(ctx, getTime, w, serviceController, blob)
156
if err != errNeedReconstructRead {
158
span.Error("read data shard only", err)
159
reportDownload(clusterID, "Direct", "error")
161
reportDownload(clusterID, "Direct", "-")
165
span.Info("read data shard only failed", err)
170
// client <--copy-- pipeline <--swap-- readBlob <--copy-- blobnode
172
// Alloc N+M shard buffers here, and release after written to client.
173
// Replace not-empty buffers in readBlob, need release old-buffers in that function.
174
closeCh := make(chan struct{})
175
pipeline := func() <-chan pipeBuffer {
176
ch := make(chan pipeBuffer, 1)
180
var blobVolume *controller.VolumePhy
181
var sortedVuids []sortedVuid
182
tactic := location.CodeMode.Tactic()
183
for _, blob := range blobs {
185
if blobVolume == nil || blobVolume.Vid != blob.Vid {
186
blobVolume, err = h.getVolume(ctx, clusterID, blob.Vid, true)
188
span.Error("get volume", err)
189
ch <- pipeBuffer{err: err}
193
// do not use local shards
194
sortedVuids = genSortedVuidByIDC(ctx, serviceController, h.IDC, blobVolume.Units[:tactic.N+tactic.M])
195
span.Debugf("to read %s with read-shard-x:%d active-shard-n:%d of data-n:%d party-n:%d",
196
blob.ID(), h.MinReadShardsX, len(sortedVuids), tactic.N, tactic.M)
197
if len(sortedVuids) < tactic.N {
198
err = fmt.Errorf("broken %s", blob.ID())
200
ch <- pipeBuffer{err: err}
206
shards := make([][]byte, tactic.N+tactic.M)
207
for ii := range shards {
208
buf, _ := h.memPool.Alloc(blob.ShardSize)
211
getTime.IncA(time.Since(st))
213
err = h.readOneBlob(ctx, getTime, serviceController, blob, sortedVuids, shards)
215
span.Error("read one blob", blob.ID(), err)
216
for _, buf := range shards {
219
ch <- pipeBuffer{err: err}
225
for _, buf := range shards {
229
case ch <- pipeBuffer{blob: blob, shards: shards}:
238
for line := range pipeline {
244
startWrite := time.Now()
247
off := line.blob.Offset
248
toReadSize := line.blob.ReadSize
250
buf := line.shards[idx]
251
l := uint64(len(buf))
258
toRead := minU64(toReadSize, l-off)
259
if _, e := w.Write(buf[off : off+toRead]); e != nil {
260
err = errors.Info(e, "write to response")
268
getTime.IncW(time.Since(startWrite))
270
for _, buf := range line.shards {
279
// release buffer in pipeline if fail to write client
281
for line := range pipeline {
282
for _, buf := range line.shards {
289
reportDownload(clusterID, "EC", "error")
290
span.Error("get request error", err)
293
reportDownload(clusterID, "EC", "-")
298
// 1. try to min-read shards bytes
299
// 2. if failed try to read next shard to reconstruct
300
// 3. write the the right offset bytes to writer
301
// 4. Just read essential bytes if the data is a segment of one shard.
302
func (h *Handler) readOneBlob(ctx context.Context, getTime *timeReadWrite,
303
serviceController controller.ServiceController,
304
blob blobGetArgs, sortedVuids []sortedVuid, shards [][]byte) error {
305
span := trace.SpanFromContextSafe(ctx)
307
tactic := blob.CodeMode.Tactic()
308
sizes, err := ec.GetBufferSizes(int(blob.BlobSize), tactic)
312
empties := emptyDataShardIndexes(sizes)
314
dataN, dataParityN := tactic.N, tactic.N+tactic.M
315
minShardsRead := dataN + h.MinReadShardsX
316
if minShardsRead > len(sortedVuids) {
317
minShardsRead = len(sortedVuids)
319
shardSize, shardOffset, shardReadSize := blob.ShardSize, blob.ShardOffset, blob.ShardReadSize
321
stopChan := make(chan struct{})
322
nextChan := make(chan struct{}, len(sortedVuids))
323
shardPipe := func() <-chan shardData {
324
ch := make(chan shardData)
326
wg := new(sync.WaitGroup)
332
for _, vuid := range sortedVuids[:minShardsRead] {
333
if _, ok := empties[vuid.index]; !ok {
335
go func(vuid sortedVuid) {
336
ch <- h.readOneShard(ctx, serviceController, blob, vuid, stopChan)
342
for _, vuid := range sortedVuids[minShardsRead:] {
343
if _, ok := empties[vuid.index]; ok {
354
go func(vuid sortedVuid) {
355
ch <- h.readOneShard(ctx, serviceController, blob, vuid, stopChan)
364
received := make(map[int]bool, minShardsRead)
365
for idx := range empties {
367
h.memPool.Zero(shards[idx])
370
startRead := time.Now()
371
reconstructed := false
372
for shard := range shardPipe {
375
buf := shards[shard.index]
376
shards[shard.index] = shard.buffer
380
received[shard.index] = shard.status
381
if len(received) < dataN {
386
badIdx := make([]int, 0, 8)
387
for i := 0; i < dataN; i++ {
388
if succ, ok := received[i]; !ok || !succ {
389
badIdx = append(badIdx, i)
392
if len(badIdx) == 0 {
398
// update bad parity index
399
for i := dataN; i < dataParityN; i++ {
400
if succ, ok := received[i]; !ok || !succ {
401
badIdx = append(badIdx, i)
406
for _, succ := range received {
411
// it will not wait all the shards, cos has no enough shards to reconstruct
412
if badShards > dataParityN-dataN {
413
span.Infof("%s bad(%d) has no enough to reconstruct", blob.ID(), badShards)
418
// has bad shards, but have enough shards to reconstruct
419
if len(received) >= dataN+badShards {
421
if shardReadSize < shardSize {
422
span.Debugf("bid(%d) ready to segment ec reconstruct data", blob.Bid)
423
reportDownload(blob.Cid, "EC", "segment")
424
segments := make([][]byte, len(shards))
425
for idx := range shards {
426
segments[idx] = shards[idx][shardOffset : shardOffset+shardReadSize]
428
err = h.encoder[blob.CodeMode].ReconstructData(segments, badIdx)
430
span.Debugf("bid(%d) ready to ec reconstruct data", blob.Bid)
431
err = h.encoder[blob.CodeMode].ReconstructData(shards, badIdx)
438
span.Errorf("%s ec reconstruct data error:%s", blob.ID(), err.Error())
441
if len(received) >= len(sortedVuids) {
445
nextChan <- struct{}{}
447
getTime.IncR(time.Since(startRead))
449
// release buffer of delayed shards
451
for shard := range shardPipe {
453
h.memPool.Put(shard.buffer)
461
return fmt.Errorf("broken %s", blob.ID())
464
func (h *Handler) readOneShard(ctx context.Context, serviceController controller.ServiceController,
465
blob blobGetArgs, vuid sortedVuid, stopChan <-chan struct{}) shardData {
466
clusterID, vid := blob.Cid, blob.Vid
467
shardOffset, shardReadSize := blob.ShardOffset, blob.ShardReadSize
468
span := trace.SpanFromContextSafe(ctx)
469
shardResult := shardData{
474
args := blobnode.RangeGetShardArgs{
475
GetShardArgs: blobnode.GetShardArgs{
480
Offset: int64(shardOffset),
481
Size: int64(shardReadSize),
488
if hErr := hystrix.Do(rwCommand, func() error {
489
body, err = h.getOneShardFromHost(ctx, serviceController, vuid.host, vuid.diskID, args,
490
vuid.index, clusterID, vid, 3, stopChan)
491
if err != nil && (errorTimeout(err) || rpc.DetectStatusCode(err) == errcode.CodeOverload) {
495
}, nil); hErr != nil {
496
span.Warnf("hystrix: read %s on %s: %s", blob.ID(), vuid.ID(), hErr.Error())
501
if err == errPunishedDisk || err == errCanceledReadShard {
502
span.Warnf("read %s on %s: %s", blob.ID(), vuid.ID(), err.Error())
505
span.Warnf("rpc read %s on %s: %s", blob.ID(), vuid.ID(), errors.Detail(err))
510
buf, err := h.memPool.Alloc(blob.ShardSize)
516
_, err = io.ReadFull(body, buf[shardOffset:shardOffset+shardReadSize])
519
span.Warnf("io read %s on %s: %s", blob.ID(), vuid.ID(), err.Error())
523
shardResult.status = true
524
shardResult.buffer = buf
528
func (h *Handler) getDataShardOnly(ctx context.Context, getTime *timeReadWrite,
529
w io.Writer, serviceController controller.ServiceController, blob blobGetArgs) error {
530
span := trace.SpanFromContextSafe(ctx)
531
if blob.ReadSize == 0 {
535
blobVolume, err := h.getVolume(ctx, blob.Cid, blob.Vid, true)
539
tactic := blobVolume.CodeMode.Tactic()
541
from, to := int(blob.Offset), int(blob.Offset+blob.ReadSize)
542
buffer, err := ec.NewRangeBuffer(int(blob.BlobSize), from, to, tactic, h.memPool)
546
defer buffer.Release()
548
shardSize := buffer.ShardSize
549
firstShardIdx := int(blob.Offset) / shardSize
550
shardOffset := int(blob.Offset) % shardSize
552
startRead := time.Now()
553
remainSize := blob.ReadSize
555
for i, shard := range blobVolume.Units[firstShardIdx:tactic.N] {
560
toReadSize := minU64(remainSize, uint64(shardSize-shardOffset))
561
args := blobnode.RangeGetShardArgs{
562
GetShardArgs: blobnode.GetShardArgs{
563
DiskID: shard.DiskID,
567
Offset: int64(shardOffset),
568
Size: int64(toReadSize),
571
body, err := h.getOneShardFromHost(ctx, serviceController, shard.Host, shard.DiskID, args,
572
firstShardIdx+i, blob.Cid, blob.Vid, 1, nil)
574
span.Warnf("read %s on blobnode(vuid:%d disk:%d host:%s) ecidx(%02d): %s", blob.ID(),
575
shard.Vuid, shard.DiskID, shard.Host, firstShardIdx+i, errors.Detail(err))
576
return errNeedReconstructRead
580
buf := buffer.DataBuf[bufOffset : bufOffset+int(toReadSize)]
581
_, err = io.ReadFull(body, buf)
584
return errNeedReconstructRead
587
// reset next shard offset
589
remainSize -= toReadSize
590
bufOffset += int(toReadSize)
592
getTime.IncR(time.Since(startRead))
595
return fmt.Errorf("no enough data to read %d", remainSize)
598
startWrite := time.Now()
599
if _, err := w.Write(buffer.DataBuf[:int(blob.ReadSize)]); err != nil {
600
getTime.IncW(time.Since(startWrite))
601
return errors.Info(err, "write to response")
603
getTime.IncW(time.Since(startWrite))
608
// getOneShardFromHost get body of one shard
609
func (h *Handler) getOneShardFromHost(ctx context.Context, serviceController controller.ServiceController,
610
host string, diskID proto.DiskID, args blobnode.RangeGetShardArgs, // get shard param with host diskid
611
index int, clusterID proto.ClusterID, vid proto.Vid, // param to update volume cache
612
attempts int, cancelChan <-chan struct{}, // do not retry again if cancelChan was closed
613
) (io.ReadCloser, error) {
614
span := trace.SpanFromContextSafe(ctx)
616
// skip punished disk
617
if diskHost, err := serviceController.GetDiskHost(ctx, diskID); err != nil {
619
} else if diskHost.Punished {
620
return nil, errPunishedDisk
627
rerr = retry.ExponentialBackoff(attempts, 200).RuptOn(func() (bool, error) {
628
if cancelChan != nil {
631
return true, errCanceledReadShard
636
// new child span to get from blobnode, we should finish it here.
637
spanChild, ctxChild := trace.StartSpanFromContextWithTraceID(
638
context.Background(), "GetFromBlobnode", span.TraceID())
639
defer spanChild.Finish()
641
body, _, err := h.blobnodeClient.RangeGetShard(ctxChild, host, &args)
647
code := rpc.DetectStatusCode(err)
649
case errcode.CodeOverload:
652
// EIO and Readonly error, then we need to punish disk in local and no need to retry
653
case errcode.CodeDiskBroken, errcode.CodeVUIDReadonly:
654
h.punishDisk(ctx, clusterID, diskID, host, "BrokenOrRO")
655
span.Warnf("punish disk:%d on:%s cos:blobnode/%d", diskID, host, code)
656
return true, fmt.Errorf("punished disk (%d %s)", diskID, host)
658
// vuid not found means the reflection between vuid and diskID has change,
659
// should refresh the blob volume cache
660
case errcode.CodeDiskNotFound, errcode.CodeVuidNotFound:
661
span.Infof("volume info outdated disk %d on host %s", diskID, host)
663
latestVolume, e := h.getVolume(ctx, clusterID, vid, false)
665
span.Warnf("update volume info with no cache %d %d err: %s", clusterID, vid, e)
668
newUnit := latestVolume.Units[index]
670
newDiskID := newUnit.DiskID
671
if newDiskID != diskID {
672
hi, e := serviceController.GetDiskHost(ctx, newDiskID)
673
if e == nil && !hi.Punished {
674
span.Infof("update disk %d %d %d -> %d", clusterID, vid, diskID, newDiskID)
678
args.GetShardArgs.DiskID = diskID
679
args.GetShardArgs.Vuid = newUnit.Vuid
684
h.punishDiskWith(ctx, clusterID, diskID, host, "NotFound")
685
span.Warnf("punish threshold disk:%d cos:blobnode/%d", diskID, code)
689
// do not retry on timeout then punish threshold this disk
690
if errorTimeout(err) {
691
h.punishDiskWith(ctx, clusterID, diskID, host, "Timeout")
694
if errorConnectionRefused(err) {
697
span.Debugf("read from disk:%d blobnode/%s", diskID, err.Error())
699
err = errors.Base(err, fmt.Sprintf("get shard on (disk:%d host:%s)", diskID, host))
706
func genLocationBlobs(location *access.Location, readSize uint64, offset uint64) ([]blobGetArgs, error) {
707
if readSize > location.Size || offset > location.Size || offset+readSize > location.Size {
708
return nil, fmt.Errorf("FileSize:%d ReadSize:%d Offset:%d", location.Size, readSize, offset)
711
blobSize := uint64(location.BlobSize)
713
return nil, fmt.Errorf("BlobSize:%d", blobSize)
716
remainSize := readSize
717
firstBlobIdx := offset / blobSize
718
blobOffset := offset % blobSize
720
tactic := location.CodeMode.Tactic()
723
blobs := make([]blobGetArgs, 0, 1+(readSize+blobOffset)/blobSize)
724
for _, blob := range location.Blobs {
725
currBlobID := blob.MinBid
727
for ii := uint32(0); ii < blob.Count; ii++ {
732
if idx >= firstBlobIdx {
733
toReadSize := minU64(remainSize, blobSize-blobOffset)
735
// update the last blob size
736
fixedBlobSize := minU64(location.Size-idx*blobSize, blobSize)
738
sizes, _ := ec.GetBufferSizes(int(fixedBlobSize), tactic)
739
shardSize := sizes.ShardSize
740
shardOffset, shardReadSize := shardSegment(shardSize, int(blobOffset), int(toReadSize))
742
blobs = append(blobs, blobGetArgs{
743
Cid: location.ClusterID,
746
CodeMode: location.CodeMode,
747
BlobSize: fixedBlobSize,
749
ReadSize: toReadSize,
751
ShardSize: shardSize,
752
ShardOffset: shardOffset,
753
ShardReadSize: shardReadSize,
757
// reset next blob offset
759
remainSize -= toReadSize
768
return nil, fmt.Errorf("no enough data to read %d", remainSize)
774
func genSortedVuidByIDC(ctx context.Context, serviceController controller.ServiceController, idc string,
775
vuidPhys []controller.Unit) []sortedVuid {
776
span := trace.SpanFromContextSafe(ctx)
778
vuids := make([]sortedVuid, 0, len(vuidPhys))
779
sortMap := make(map[int][]sortedVuid)
781
for idx, phy := range vuidPhys {
782
var hostIDC *controller.HostIDC
783
if err := retry.ExponentialBackoff(2, 100).On(func() error {
784
hi, e := serviceController.GetDiskHost(context.Background(), phy.DiskID)
791
span.Warnf("no host of disk(%d %d) %s", phy.Vuid, phy.DiskID, err.Error())
795
dis := distance(idc, hostIDC.IDC, hostIDC.Punished)
796
if _, ok := sortMap[dis]; !ok {
797
sortMap[dis] = make([]sortedVuid, 0, 8)
799
sortMap[dis] = append(sortMap[dis], sortedVuid{
807
keys := make([]int, 0, len(sortMap))
808
for dis := range sortMap {
809
keys = append(keys, dis)
813
for _, dis := range keys {
815
rand.Shuffle(len(ids), func(i, j int) {
816
ids[i], ids[j] = ids[j], ids[i]
818
vuids = append(vuids, ids...)
820
span.Debugf("distance: %d punished vuids: %+v", dis, ids)
827
func distance(idc1, idc2 string, punished bool) int {
840
func emptyDataShardIndexes(sizes ec.BufferSizes) map[int]struct{} {
841
firstEmptyIdx := (sizes.DataSize + sizes.ShardSize - 1) / sizes.ShardSize
842
n := sizes.ECDataSize / sizes.ShardSize
843
if firstEmptyIdx >= n {
844
return make(map[int]struct{})
847
set := make(map[int]struct{}, n-firstEmptyIdx)
848
for i := firstEmptyIdx; i < n; i++ {
855
func shardSegment(shardSize, blobOffset, blobReadSize int) (shardOffset, shardReadSize int) {
856
shardOffset = blobOffset % shardSize
857
if lastOffset := shardOffset + blobReadSize; lastOffset > shardSize {
858
shardOffset, shardReadSize = 0, shardSize
860
shardReadSize = blobReadSize