cubefs

Форк
0
/
stream_put.go 
445 строк · 12.1 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package access
16

17
import (
18
	"bytes"
19
	"context"
20
	"fmt"
21
	"hash/crc32"
22
	"io"
23
	"sync"
24
	"sync/atomic"
25
	"time"
26

27
	"github.com/afex/hystrix-go/hystrix"
28

29
	"github.com/cubefs/cubefs/blobstore/api/access"
30
	"github.com/cubefs/cubefs/blobstore/api/blobnode"
31
	"github.com/cubefs/cubefs/blobstore/common/ec"
32
	errcode "github.com/cubefs/cubefs/blobstore/common/errors"
33
	"github.com/cubefs/cubefs/blobstore/common/rpc"
34
	"github.com/cubefs/cubefs/blobstore/common/trace"
35
	"github.com/cubefs/cubefs/blobstore/util/errors"
36
	"github.com/cubefs/cubefs/blobstore/util/retry"
37
)
38

39
// TODO: To Be Continue
40
//  put empty shard to blobnode if file has been aligned.
41

42
// Put put one object
43
//
44
//	required: size, file size
45
//	optional: hasher map to calculate hash.Hash
46
func (h *Handler) Put(ctx context.Context, rc io.Reader, size int64,
47
	hasherMap access.HasherMap) (*access.Location, error) {
48
	span := trace.SpanFromContextSafe(ctx)
49
	span.Debugf("put request size:%d hashes:b(%b)", size, hasherMap.ToHashAlgorithm())
50

51
	if size <= 0 {
52
		return nil, errcode.ErrIllegalArguments
53
	}
54
	if size > h.maxObjectSize {
55
		span.Info("exceed max object size", h.maxObjectSize)
56
		return nil, errcode.ErrAccessExceedSize
57
	}
58

59
	// 1.make hasher
60
	if len(hasherMap) > 0 {
61
		rc = io.TeeReader(rc, hasherMap.ToWriter())
62
	}
63

64
	// 2.choose cluster and alloc volume from allocator
65
	selectedCodeMode := h.allCodeModes.SelectCodeMode(size)
66
	span.Debugf("select codemode %d", selectedCodeMode)
67

68
	blobSize := atomic.LoadUint32(&h.MaxBlobSize)
69
	clusterID, blobs, err := h.allocFromAllocatorWithHystrix(ctx, selectedCodeMode, uint64(size), blobSize, 0)
70
	if err != nil {
71
		span.Error("alloc failed", errors.Detail(err))
72
		return nil, err
73
	}
74
	span.Debugf("allocated from %d %+v", clusterID, blobs)
75

76
	// 3.read body and split, alloc from mem pool;ec encode and put into data node
77
	limitReader := io.LimitReader(rc, int64(size))
78
	location := &access.Location{
79
		ClusterID: clusterID,
80
		CodeMode:  selectedCodeMode,
81
		Size:      uint64(size),
82
		BlobSize:  blobSize,
83
		Blobs:     blobs,
84
	}
85

86
	uploadSucc := false
87
	defer func() {
88
		if !uploadSucc {
89
			span.Infof("put failed clean location %+v", location)
90
			if err := h.clearGarbage(ctx, location); err != nil {
91
				span.Warn(errors.Detail(err))
92
			}
93
		}
94
	}()
95

96
	var buffer *ec.Buffer
97
	putTime := new(timeReadWrite)
98
	defer func() {
99
		// release ec buffer which have not takeover
100
		buffer.Release()
101
		span.AppendRPCTrackLog([]string{putTime.String()})
102
	}()
103

104
	// concurrent buffer in per request
105
	const concurrence = 4
106
	ready := make(chan struct{}, concurrence)
107
	for range [concurrence]struct{}{} {
108
		ready <- struct{}{}
109
	}
110

111
	encoder := h.encoder[selectedCodeMode]
112
	tactic := selectedCodeMode.Tactic()
113
	for _, blob := range location.Spread() {
114
		vid, bid, bsize := blob.Vid, blob.Bid, int(blob.Size)
115

116
		// new an empty ec buffer for per blob
117
		var err error
118
		st := time.Now()
119
		buffer, err = ec.NewBuffer(bsize, tactic, h.memPool)
120
		putTime.IncA(time.Since(st))
121
		if err != nil {
122
			return nil, err
123
		}
124

125
		readBuff := buffer.DataBuf[:bsize]
126
		shards, err := encoder.Split(buffer.ECDataBuf)
127
		if err != nil {
128
			return nil, err
129
		}
130

131
		startRead := time.Now()
132
		n, err := io.ReadFull(limitReader, readBuff)
133
		putTime.IncR(time.Since(startRead))
134
		if err != nil && err != io.EOF {
135
			span.Infof("read blob data failed want:%d read:%d %s", bsize, n, err.Error())
136
			return nil, errcode.ErrAccessReadRequestBody
137
		}
138
		if n != bsize {
139
			span.Infof("read blob less data want:%d but:%d", bsize, n)
140
			return nil, errcode.ErrAccessReadRequestBody
141
		}
142

143
		// ec encode
144
		if err = encoder.Encode(shards); err != nil {
145
			return nil, err
146
		}
147

148
		blobident := blobIdent{clusterID, vid, bid}
149
		span.Debug("to write", blobident)
150

151
		// takeover the buffer, release to pool in function writeToBlobnodes
152
		takeoverBuffer := buffer
153
		buffer = nil
154
		<-ready
155
		startWrite := time.Now()
156
		err = h.writeToBlobnodesWithHystrix(ctx, blobident, shards, func() {
157
			takeoverBuffer.Release()
158
			ready <- struct{}{}
159
		})
160
		putTime.IncW(time.Since(startWrite))
161
		if err != nil {
162
			return nil, errors.Info(err, "write to blobnode failed")
163
		}
164
	}
165

166
	uploadSucc = true
167
	return location, nil
168
}
169

170
func (h *Handler) writeToBlobnodesWithHystrix(ctx context.Context,
171
	blob blobIdent, shards [][]byte, callback func()) error {
172
	safe := make(chan struct{}, 1)
173
	err := hystrix.Do(rwCommand, func() error {
174
		safe <- struct{}{}
175
		return h.writeToBlobnodes(ctx, blob, shards, callback)
176
	}, nil)
177

178
	select {
179
	case <-safe:
180
	default:
181
		callback() // callback if fused by hystrix
182
	}
183
	return err
184
}
185

186
type shardPutStatus struct {
187
	index  int
188
	status bool
189
}
190

191
// writeToBlobnodes write shards to blobnodes.
192
// takeover ec buffer release by callback.
193
// return if had quorum successful shards, then wait all shards in background.
194
func (h *Handler) writeToBlobnodes(ctx context.Context,
195
	blob blobIdent, shards [][]byte, callback func()) (err error) {
196
	span := trace.SpanFromContextSafe(ctx)
197
	clusterID, vid, bid := blob.cid, blob.vid, blob.bid
198

199
	wg := &sync.WaitGroup{}
200
	defer func() {
201
		// waiting all shards done in background
202
		go func() {
203
			wg.Wait()
204
			callback()
205
		}()
206
	}()
207

208
	volume, err := h.getVolume(ctx, clusterID, vid, true)
209
	if err != nil {
210
		return
211
	}
212
	serviceController, err := h.clusterController.GetServiceController(clusterID)
213
	if err != nil {
214
		return
215
	}
216

217
	statusCh := make(chan shardPutStatus, len(volume.Units))
218
	tactic := volume.CodeMode.Tactic()
219
	putQuorum := uint32(tactic.PutQuorum)
220
	if num, ok := h.CodeModesPutQuorums[volume.CodeMode]; ok && num <= tactic.N+tactic.M {
221
		putQuorum = uint32(num)
222
	}
223

224
	// writtenNum ONLY apply on data and partiy shards
225
	// TODO: count N and M in each AZ,
226
	//    decision ec data is recoverable or not.
227
	maxWrittenIndex := tactic.N + tactic.M
228
	writtenNum := uint32(0)
229

230
	wg.Add(len(volume.Units))
231
	for i, unitI := range volume.Units {
232
		index, unit := i, unitI
233

234
		go func() {
235
			status := shardPutStatus{index: index}
236
			defer func() {
237
				statusCh <- status
238
				wg.Done()
239
			}()
240

241
			diskID := unit.DiskID
242
			args := &blobnode.PutShardArgs{
243
				DiskID: diskID,
244
				Vuid:   unit.Vuid,
245
				Bid:    bid,
246
				Size:   int64(len(shards[index])),
247
				Type:   blobnode.NormalIO,
248
			}
249

250
			crcDisabled := h.ShardCrcDisabled
251
			var crcOrigin uint32
252
			if !crcDisabled {
253
				crcOrigin = crc32.ChecksumIEEE(shards[index])
254
			}
255

256
			// new child span to write to blobnode, we should finish it here.
257
			spanChild, ctxChild := trace.StartSpanFromContextWithTraceID(
258
				context.Background(), "WriteToBlobnode", span.TraceID())
259
			defer spanChild.Finish()
260

261
		RETRY:
262
			hostInfo, err := serviceController.GetDiskHost(ctxChild, diskID)
263
			if err != nil {
264
				span.Error("get disk host failed", errors.Detail(err))
265
				return
266
			}
267
			// punished disk, ignore and return
268
			if hostInfo.Punished {
269
				span.Warnf("ignore punished disk(%d %s) uvid(%d) ecidx(%02d) in idc(%s)",
270
					diskID, hostInfo.Host, unit.Vuid, index, hostInfo.IDC)
271
				return
272
			}
273
			host := hostInfo.Host
274

275
			var (
276
				writeErr  error
277
				needRetry bool
278
				crc       uint32
279
			)
280
			writeErr = retry.ExponentialBackoff(3, 200).RuptOn(func() (bool, error) {
281
				args.Body = bytes.NewReader(shards[index])
282

283
				crc, err = h.blobnodeClient.PutShard(ctxChild, host, args)
284
				if err == nil {
285
					if !crcDisabled && crc != crcOrigin {
286
						return false, fmt.Errorf("crc mismatch 0x%x != 0x%x", crc, crcOrigin)
287
					}
288

289
					needRetry = false
290
					return true, nil
291
				}
292

293
				code := rpc.DetectStatusCode(err)
294
				switch code {
295
				case errcode.CodeDiskBroken, errcode.CodeDiskNotFound,
296
					errcode.CodeChunkNoSpace, errcode.CodeVUIDReadonly:
297
					h.discardVidChan <- discardVid{
298
						cid:      clusterID,
299
						codeMode: volume.CodeMode,
300
						vid:      vid,
301
					}
302
				default:
303
				}
304

305
				switch code {
306
				// EIO and Readonly error, then we need to punish disk in local and no necessary to retry
307
				case errcode.CodeDiskBroken, errcode.CodeVUIDReadonly:
308
					h.punishVolume(ctx, clusterID, vid, host, "BrokenOrRO")
309
					h.punishDisk(ctx, clusterID, diskID, host, "BrokenOrRO")
310
					span.Warnf("punish disk:%d volume:%d cos:blobnode/%d", diskID, vid, code)
311
					return true, err
312

313
				// chunk no space, we should punish this volume
314
				case errcode.CodeChunkNoSpace:
315
					h.punishVolume(ctx, clusterID, vid, host, "NoSpace")
316
					span.Warnf("punish volume:%d cos:blobnode/%d", vid, code)
317
					return true, err
318

319
				// vuid not found means the reflection between vuid and diskID has change, we should refresh the volume
320
				// disk not found means disk has been repaired or offline
321
				case errcode.CodeDiskNotFound, errcode.CodeVuidNotFound:
322
					latestVolume, e := h.getVolume(ctx, clusterID, vid, false)
323
					if e != nil {
324
						return true, errors.Base(err, "get volume with no cache failed").Detail(e)
325
					}
326

327
					newUnit := latestVolume.Units[index]
328
					if diskID != newUnit.DiskID {
329
						diskID = newUnit.DiskID
330
						unit = newUnit
331
						args.DiskID = newUnit.DiskID
332
						args.Vuid = newUnit.Vuid
333

334
						needRetry = true
335
						return true, err
336
					}
337

338
					h.punishVolume(ctx, clusterID, vid, host, "NotFound")
339
					h.punishDisk(ctx, clusterID, diskID, host, "NotFound")
340
					span.Warnf("punish disk:%d volume:%d cos:blobnode/%d", diskID, vid, code)
341
					return true, err
342
				default:
343
				}
344

345
				// in timeout case and writtenNum is not satisfied with putQuorum, then should retry
346
				if errorTimeout(err) && atomic.LoadUint32(&writtenNum) < putQuorum {
347
					h.punishDiskWith(ctx, clusterID, diskID, host, "Timeout")
348
					span.Warn("connect timeout, need to punish threshold disk", diskID, host)
349
					return false, err
350
				}
351

352
				// others, do not retry this round
353
				return true, err
354
			})
355

356
			if needRetry {
357
				goto RETRY
358
			}
359
			if writeErr != nil {
360
				span.Warnf("write %s on blobnode(vuid:%d disk:%d host:%s) ecidx(%02d): %s",
361
					blob.String(), args.Vuid, args.DiskID, hostInfo.Host, index, errors.Detail(writeErr))
362
				return
363
			}
364

365
			if index < maxWrittenIndex {
366
				atomic.AddUint32(&writtenNum, 1)
367
			}
368
			status.status = true
369
		}()
370
	}
371

372
	received := make(map[int]shardPutStatus, len(volume.Units))
373
	for len(received) < len(volume.Units) && atomic.LoadUint32(&writtenNum) < putQuorum {
374
		st := <-statusCh
375
		received[st.index] = st
376
	}
377

378
	writeDone := make(chan struct{}, 1)
379
	// write unaccomplished shard to repair queue
380
	go func(writeDone <-chan struct{}) {
381
		for len(received) < len(volume.Units) {
382
			st := <-statusCh
383
			received[st.index] = st
384
		}
385

386
		if _, ok := <-writeDone; !ok {
387
			return
388
		}
389

390
		badIdxes := make([]uint8, 0)
391
		for idx := range volume.Units {
392
			if st, ok := received[idx]; ok && st.status {
393
				continue
394
			}
395
			badIdxes = append(badIdxes, uint8(idx))
396
		}
397
		if len(badIdxes) > 0 {
398
			h.sendRepairMsgBg(ctx, blob, badIdxes)
399
		}
400
	}(writeDone)
401

402
	// return if had quorum successful shards
403
	if atomic.LoadUint32(&writtenNum) >= putQuorum {
404
		writeDone <- struct{}{}
405
		return
406
	}
407

408
	// It tolerate one az was down when we have 3 or more azs.
409
	// But MUST make sure others azs data is all completed,
410
	// And all data in the down az are failed.
411
	if tactic.AZCount >= 3 {
412
		allFine := 0
413
		allDown := 0
414

415
		for _, azIndexes := range tactic.GetECLayoutByAZ() {
416
			azFine := true
417
			azDown := true
418
			for _, idx := range azIndexes {
419
				if st, ok := received[idx]; !ok || !st.status {
420
					azFine = false
421
				} else {
422
					azDown = false
423
				}
424
			}
425
			if azFine {
426
				allFine++
427
			}
428
			if azDown {
429
				allDown++
430
			}
431
		}
432

433
		span.Debugf("tolerate-multi-az-write (az-fine:%d az-down:%d az-all:%d)", allFine, allDown, tactic.AZCount)
434
		if allFine == tactic.AZCount-1 && allDown == 1 {
435
			span.Warnf("tolerate-multi-az-write (az-fine:%d az-down:%d az-all:%d) of %s",
436
				allFine, allDown, tactic.AZCount, blob.String())
437
			writeDone <- struct{}{}
438
			return
439
		}
440
	}
441

442
	close(writeDone)
443
	err = fmt.Errorf("quorum write failed (%d < %d) of %s", writtenNum, putQuorum, blob.String())
444
	return
445
}
446

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.