1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
27
"github.com/afex/hystrix-go/hystrix"
29
"github.com/cubefs/cubefs/blobstore/api/access"
30
"github.com/cubefs/cubefs/blobstore/api/blobnode"
31
"github.com/cubefs/cubefs/blobstore/common/ec"
32
errcode "github.com/cubefs/cubefs/blobstore/common/errors"
33
"github.com/cubefs/cubefs/blobstore/common/rpc"
34
"github.com/cubefs/cubefs/blobstore/common/trace"
35
"github.com/cubefs/cubefs/blobstore/util/errors"
36
"github.com/cubefs/cubefs/blobstore/util/retry"
39
// TODO: To Be Continue
40
// put empty shard to blobnode if file has been aligned.
44
// required: size, file size
45
// optional: hasher map to calculate hash.Hash
46
func (h *Handler) Put(ctx context.Context, rc io.Reader, size int64,
47
hasherMap access.HasherMap) (*access.Location, error) {
48
span := trace.SpanFromContextSafe(ctx)
49
span.Debugf("put request size:%d hashes:b(%b)", size, hasherMap.ToHashAlgorithm())
52
return nil, errcode.ErrIllegalArguments
54
if size > h.maxObjectSize {
55
span.Info("exceed max object size", h.maxObjectSize)
56
return nil, errcode.ErrAccessExceedSize
60
if len(hasherMap) > 0 {
61
rc = io.TeeReader(rc, hasherMap.ToWriter())
64
// 2.choose cluster and alloc volume from allocator
65
selectedCodeMode := h.allCodeModes.SelectCodeMode(size)
66
span.Debugf("select codemode %d", selectedCodeMode)
68
blobSize := atomic.LoadUint32(&h.MaxBlobSize)
69
clusterID, blobs, err := h.allocFromAllocatorWithHystrix(ctx, selectedCodeMode, uint64(size), blobSize, 0)
71
span.Error("alloc failed", errors.Detail(err))
74
span.Debugf("allocated from %d %+v", clusterID, blobs)
76
// 3.read body and split, alloc from mem pool;ec encode and put into data node
77
limitReader := io.LimitReader(rc, int64(size))
78
location := &access.Location{
80
CodeMode: selectedCodeMode,
89
span.Infof("put failed clean location %+v", location)
90
if err := h.clearGarbage(ctx, location); err != nil {
91
span.Warn(errors.Detail(err))
97
putTime := new(timeReadWrite)
99
// release ec buffer which have not takeover
101
span.AppendRPCTrackLog([]string{putTime.String()})
104
// concurrent buffer in per request
105
const concurrence = 4
106
ready := make(chan struct{}, concurrence)
107
for range [concurrence]struct{}{} {
111
encoder := h.encoder[selectedCodeMode]
112
tactic := selectedCodeMode.Tactic()
113
for _, blob := range location.Spread() {
114
vid, bid, bsize := blob.Vid, blob.Bid, int(blob.Size)
116
// new an empty ec buffer for per blob
119
buffer, err = ec.NewBuffer(bsize, tactic, h.memPool)
120
putTime.IncA(time.Since(st))
125
readBuff := buffer.DataBuf[:bsize]
126
shards, err := encoder.Split(buffer.ECDataBuf)
131
startRead := time.Now()
132
n, err := io.ReadFull(limitReader, readBuff)
133
putTime.IncR(time.Since(startRead))
134
if err != nil && err != io.EOF {
135
span.Infof("read blob data failed want:%d read:%d %s", bsize, n, err.Error())
136
return nil, errcode.ErrAccessReadRequestBody
139
span.Infof("read blob less data want:%d but:%d", bsize, n)
140
return nil, errcode.ErrAccessReadRequestBody
144
if err = encoder.Encode(shards); err != nil {
148
blobident := blobIdent{clusterID, vid, bid}
149
span.Debug("to write", blobident)
151
// takeover the buffer, release to pool in function writeToBlobnodes
152
takeoverBuffer := buffer
155
startWrite := time.Now()
156
err = h.writeToBlobnodesWithHystrix(ctx, blobident, shards, func() {
157
takeoverBuffer.Release()
160
putTime.IncW(time.Since(startWrite))
162
return nil, errors.Info(err, "write to blobnode failed")
170
func (h *Handler) writeToBlobnodesWithHystrix(ctx context.Context,
171
blob blobIdent, shards [][]byte, callback func()) error {
172
safe := make(chan struct{}, 1)
173
err := hystrix.Do(rwCommand, func() error {
175
return h.writeToBlobnodes(ctx, blob, shards, callback)
181
callback() // callback if fused by hystrix
186
type shardPutStatus struct {
191
// writeToBlobnodes write shards to blobnodes.
192
// takeover ec buffer release by callback.
193
// return if had quorum successful shards, then wait all shards in background.
194
func (h *Handler) writeToBlobnodes(ctx context.Context,
195
blob blobIdent, shards [][]byte, callback func()) (err error) {
196
span := trace.SpanFromContextSafe(ctx)
197
clusterID, vid, bid := blob.cid, blob.vid, blob.bid
199
wg := &sync.WaitGroup{}
201
// waiting all shards done in background
208
volume, err := h.getVolume(ctx, clusterID, vid, true)
212
serviceController, err := h.clusterController.GetServiceController(clusterID)
217
statusCh := make(chan shardPutStatus, len(volume.Units))
218
tactic := volume.CodeMode.Tactic()
219
putQuorum := uint32(tactic.PutQuorum)
220
if num, ok := h.CodeModesPutQuorums[volume.CodeMode]; ok && num <= tactic.N+tactic.M {
221
putQuorum = uint32(num)
224
// writtenNum ONLY apply on data and partiy shards
225
// TODO: count N and M in each AZ,
226
// decision ec data is recoverable or not.
227
maxWrittenIndex := tactic.N + tactic.M
228
writtenNum := uint32(0)
230
wg.Add(len(volume.Units))
231
for i, unitI := range volume.Units {
232
index, unit := i, unitI
235
status := shardPutStatus{index: index}
241
diskID := unit.DiskID
242
args := &blobnode.PutShardArgs{
246
Size: int64(len(shards[index])),
247
Type: blobnode.NormalIO,
250
crcDisabled := h.ShardCrcDisabled
253
crcOrigin = crc32.ChecksumIEEE(shards[index])
256
// new child span to write to blobnode, we should finish it here.
257
spanChild, ctxChild := trace.StartSpanFromContextWithTraceID(
258
context.Background(), "WriteToBlobnode", span.TraceID())
259
defer spanChild.Finish()
262
hostInfo, err := serviceController.GetDiskHost(ctxChild, diskID)
264
span.Error("get disk host failed", errors.Detail(err))
267
// punished disk, ignore and return
268
if hostInfo.Punished {
269
span.Warnf("ignore punished disk(%d %s) uvid(%d) ecidx(%02d) in idc(%s)",
270
diskID, hostInfo.Host, unit.Vuid, index, hostInfo.IDC)
273
host := hostInfo.Host
280
writeErr = retry.ExponentialBackoff(3, 200).RuptOn(func() (bool, error) {
281
args.Body = bytes.NewReader(shards[index])
283
crc, err = h.blobnodeClient.PutShard(ctxChild, host, args)
285
if !crcDisabled && crc != crcOrigin {
286
return false, fmt.Errorf("crc mismatch 0x%x != 0x%x", crc, crcOrigin)
293
code := rpc.DetectStatusCode(err)
295
case errcode.CodeDiskBroken, errcode.CodeDiskNotFound,
296
errcode.CodeChunkNoSpace, errcode.CodeVUIDReadonly:
297
h.discardVidChan <- discardVid{
299
codeMode: volume.CodeMode,
306
// EIO and Readonly error, then we need to punish disk in local and no necessary to retry
307
case errcode.CodeDiskBroken, errcode.CodeVUIDReadonly:
308
h.punishVolume(ctx, clusterID, vid, host, "BrokenOrRO")
309
h.punishDisk(ctx, clusterID, diskID, host, "BrokenOrRO")
310
span.Warnf("punish disk:%d volume:%d cos:blobnode/%d", diskID, vid, code)
313
// chunk no space, we should punish this volume
314
case errcode.CodeChunkNoSpace:
315
h.punishVolume(ctx, clusterID, vid, host, "NoSpace")
316
span.Warnf("punish volume:%d cos:blobnode/%d", vid, code)
319
// vuid not found means the reflection between vuid and diskID has change, we should refresh the volume
320
// disk not found means disk has been repaired or offline
321
case errcode.CodeDiskNotFound, errcode.CodeVuidNotFound:
322
latestVolume, e := h.getVolume(ctx, clusterID, vid, false)
324
return true, errors.Base(err, "get volume with no cache failed").Detail(e)
327
newUnit := latestVolume.Units[index]
328
if diskID != newUnit.DiskID {
329
diskID = newUnit.DiskID
331
args.DiskID = newUnit.DiskID
332
args.Vuid = newUnit.Vuid
338
h.punishVolume(ctx, clusterID, vid, host, "NotFound")
339
h.punishDisk(ctx, clusterID, diskID, host, "NotFound")
340
span.Warnf("punish disk:%d volume:%d cos:blobnode/%d", diskID, vid, code)
345
// in timeout case and writtenNum is not satisfied with putQuorum, then should retry
346
if errorTimeout(err) && atomic.LoadUint32(&writtenNum) < putQuorum {
347
h.punishDiskWith(ctx, clusterID, diskID, host, "Timeout")
348
span.Warn("connect timeout, need to punish threshold disk", diskID, host)
352
// others, do not retry this round
360
span.Warnf("write %s on blobnode(vuid:%d disk:%d host:%s) ecidx(%02d): %s",
361
blob.String(), args.Vuid, args.DiskID, hostInfo.Host, index, errors.Detail(writeErr))
365
if index < maxWrittenIndex {
366
atomic.AddUint32(&writtenNum, 1)
372
received := make(map[int]shardPutStatus, len(volume.Units))
373
for len(received) < len(volume.Units) && atomic.LoadUint32(&writtenNum) < putQuorum {
375
received[st.index] = st
378
writeDone := make(chan struct{}, 1)
379
// write unaccomplished shard to repair queue
380
go func(writeDone <-chan struct{}) {
381
for len(received) < len(volume.Units) {
383
received[st.index] = st
386
if _, ok := <-writeDone; !ok {
390
badIdxes := make([]uint8, 0)
391
for idx := range volume.Units {
392
if st, ok := received[idx]; ok && st.status {
395
badIdxes = append(badIdxes, uint8(idx))
397
if len(badIdxes) > 0 {
398
h.sendRepairMsgBg(ctx, blob, badIdxes)
402
// return if had quorum successful shards
403
if atomic.LoadUint32(&writtenNum) >= putQuorum {
404
writeDone <- struct{}{}
408
// It tolerate one az was down when we have 3 or more azs.
409
// But MUST make sure others azs data is all completed,
410
// And all data in the down az are failed.
411
if tactic.AZCount >= 3 {
415
for _, azIndexes := range tactic.GetECLayoutByAZ() {
418
for _, idx := range azIndexes {
419
if st, ok := received[idx]; !ok || !st.status {
433
span.Debugf("tolerate-multi-az-write (az-fine:%d az-down:%d az-all:%d)", allFine, allDown, tactic.AZCount)
434
if allFine == tactic.AZCount-1 && allDown == 1 {
435
span.Warnf("tolerate-multi-az-write (az-fine:%d az-down:%d az-all:%d) of %s",
436
allFine, allDown, tactic.AZCount, blob.String())
437
writeDone <- struct{}{}
443
err = fmt.Errorf("quorum write failed (%d < %d) of %s", writtenNum, putQuorum, blob.String())