cubefs

Форк
0
/
reader.go 
431 строка · 12.2 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package blobstore
16

17
import (
18
	"context"
19
	"fmt"
20
	"io"
21
	"os"
22
	"sync"
23
	"syscall"
24
	"time"
25

26
	"github.com/cubefs/cubefs/blockcache/bcache"
27
	"github.com/cubefs/cubefs/proto"
28
	"github.com/cubefs/cubefs/sdk/data/manager"
29
	"github.com/cubefs/cubefs/sdk/data/stream"
30
	"github.com/cubefs/cubefs/sdk/meta"
31
	"github.com/cubefs/cubefs/util"
32
	"github.com/cubefs/cubefs/util/exporter"
33
	"github.com/cubefs/cubefs/util/log"
34
	"github.com/cubefs/cubefs/util/stat"
35
)
36

37
type rwSlice struct {
38
	index        int
39
	fileOffset   uint64
40
	size         uint32
41
	rOffset      uint64
42
	rSize        uint32
43
	read         int
44
	Data         []byte
45
	extentKey    proto.ExtentKey
46
	objExtentKey proto.ObjExtentKey
47
}
48

49
func (s rwSlice) String() string {
50
	return fmt.Sprintf("rwSlice{fileOffset(%v),size(%v),rOffset(%v),rSize(%v),read(%v),extentKey(%v),objExtentKey(%v)}", s.fileOffset, s.size, s.rOffset, s.rSize, s.read, s.extentKey, s.objExtentKey)
51
}
52

53
func (reader *Reader) String() string {
54
	return fmt.Sprintf("Reader{address(%v),volName(%v),volType(%v),ino(%v),fileSize(%v),enableBcache(%v),cacheAction(%v),fileCache(%v),cacheThreshold(%v)},readConcurrency(%v)",
55
		&reader, reader.volName, reader.volType, reader.ino, reader.fileLength, reader.enableBcache, reader.cacheAction, reader.fileCache, reader.cacheThreshold, reader.readConcurrency)
56
}
57

58
type Reader struct {
59
	volName         string
60
	volType         int
61
	ino             uint64
62
	offset          uint64
63
	data            []byte
64
	err             chan error
65
	bc              *bcache.BcacheClient
66
	mw              *meta.MetaWrapper
67
	ec              *stream.ExtentClient
68
	ebs             *BlobStoreClient
69
	readConcurrency int
70
	cacheTimeout    time.Duration
71
	wg              sync.WaitGroup
72
	once            sync.Once
73
	sync.Mutex
74
	close           bool
75
	extentKeys      []proto.ExtentKey
76
	missExtentKeys  []proto.ExtentKey
77
	objExtentKeys   []proto.ObjExtentKey
78
	enableBcache    bool
79
	cacheAction     int
80
	fileCache       bool
81
	cacheThreshold  int
82
	fileLength      uint64
83
	valid           bool
84
	inflightL2cache sync.Map
85
	limitManager    *manager.LimitManager
86
}
87

88
type ClientConfig struct {
89
	VolName         string
90
	VolType         int
91
	BlockSize       int
92
	Ino             uint64
93
	Bc              *bcache.BcacheClient
94
	Mw              *meta.MetaWrapper
95
	Ec              *stream.ExtentClient
96
	Ebsc            *BlobStoreClient
97
	EnableBcache    bool
98
	WConcurrency    int
99
	ReadConcurrency int
100
	CacheAction     int
101
	FileCache       bool
102
	FileSize        uint64
103
	CacheThreshold  int
104
}
105

106
func NewReader(config ClientConfig) (reader *Reader) {
107
	reader = new(Reader)
108

109
	reader.volName = config.VolName
110
	reader.volType = config.VolType
111
	reader.ino = config.Ino
112
	reader.bc = config.Bc
113
	reader.ebs = config.Ebsc
114
	reader.mw = config.Mw
115
	reader.ec = config.Ec
116
	reader.enableBcache = config.EnableBcache
117
	reader.readConcurrency = config.ReadConcurrency
118
	reader.cacheAction = config.CacheAction
119
	reader.fileCache = config.FileCache
120
	reader.cacheThreshold = config.CacheThreshold
121

122
	if proto.IsCold(reader.volType) {
123
		reader.ec.UpdateDataPartitionForColdVolume()
124
	}
125

126
	reader.limitManager = reader.ec.LimitManager
127
	return
128
}
129

130
func (reader *Reader) Read(ctx context.Context, buf []byte, offset int, size int) (int, error) {
131
	if reader == nil {
132
		return 0, fmt.Errorf("reader is not opened yet")
133
	}
134
	log.LogDebugf("TRACE reader Read Enter. ino(%v) offset(%v) len(%v)", reader.ino, offset, size)
135
	var (
136
		read = 0
137
		err  error
138
	)
139
	if reader.close {
140
		return 0, os.ErrInvalid
141
	}
142

143
	reader.Lock()
144
	defer reader.Unlock()
145
	// cold volume,slice read
146
	var rSlices []*rwSlice
147
	if size != len(buf) {
148
		size = len(buf)
149
	}
150

151
	rSlices, err = reader.prepareEbsSlice(offset, uint32(size))
152
	log.LogDebugf("TRACE reader Read. ino(%v)  rSlices-length(%v) ", reader.ino, len(rSlices))
153

154
	if err != nil {
155
		return 0, err
156
	}
157
	sliceSize := len(rSlices)
158
	if sliceSize > 0 {
159
		reader.wg.Add(sliceSize)
160
		pool := New(reader.readConcurrency, sliceSize)
161
		defer pool.Close()
162
		reader.err = make(chan error, sliceSize)
163
		for _, rs := range rSlices {
164
			pool.Execute(rs, func(param *rwSlice) {
165
				reader.readSliceRange(ctx, param)
166
			})
167
		}
168

169
		reader.wg.Wait()
170
		for i := 0; i < sliceSize; i++ {
171
			if err, ok := <-reader.err; !ok || err != nil {
172
				return 0, err
173
			}
174
		}
175
		close(reader.err)
176
	}
177
	for i := 0; i < sliceSize; i++ {
178
		read += copy(buf[read:], rSlices[i].Data)
179
	}
180
	log.LogDebugf("TRACE reader Read Exit. ino(%v)  readN(%v) buf-len(%v)", reader.ino, read, len(buf))
181
	return read, nil
182
}
183

184
func (reader *Reader) Close(ctx context.Context) {
185
	reader.Lock()
186
	reader.close = true
187
	reader.Unlock()
188
}
189

190
func (reader *Reader) prepareEbsSlice(offset int, size uint32) ([]*rwSlice, error) {
191
	if offset < 0 {
192
		return nil, syscall.EIO
193
	}
194
	chunks := make([]*rwSlice, 0)
195
	endflag := false
196
	selected := false
197

198
	reader.once.Do(func() {
199
		reader.refreshEbsExtents()
200
	})
201
	fileSize, valid := reader.fileSize()
202
	reader.fileLength = fileSize
203
	log.LogDebugf("TRACE blobStore prepareEbsSlice Enter. ino(%v)  fileSize(%v) ", reader.ino, fileSize)
204
	if !valid {
205
		log.LogErrorf("Reader: invoke fileSize fail. ino(%v)  offset(%v) size(%v)", reader.ino, offset, size)
206
		return nil, syscall.EIO
207
	}
208
	log.LogDebugf("TRACE blobStore prepareEbsSlice. ino(%v)  offset(%v) size(%v)", reader.ino, offset, size)
209
	if uint64(offset) >= fileSize {
210
		return nil, io.EOF
211
	}
212

213
	start := uint64(offset)
214
	if uint64(offset)+uint64(size) > fileSize {
215
		size = uint32(fileSize - uint64(offset))
216
	}
217
	end := uint64(offset + int(size))
218
	for index, oek := range reader.objExtentKeys {
219
		rs := &rwSlice{}
220
		if oek.FileOffset <= start && start < oek.FileOffset+(oek.Size) {
221
			rs.index = index
222
			rs.fileOffset = oek.FileOffset
223
			rs.size = uint32(oek.Size)
224
			rs.rOffset = start - oek.FileOffset
225
			rs.rSize = uint32(oek.FileOffset + oek.Size - start)
226
			selected = true
227
		}
228
		if end <= oek.FileOffset+oek.Size {
229
			rs.rSize = uint32(end - start)
230
			selected = true
231
			endflag = true
232
		}
233
		if selected {
234
			rs.objExtentKey = oek
235
			reader.buildExtentKey(rs)
236
			rs.Data = make([]byte, rs.rSize)
237
			start = oek.FileOffset + oek.Size
238
			chunks = append(chunks, rs)
239
			log.LogDebugf("TRACE blobStore prepareEbsSlice. ino(%v)  offset(%v) size(%v) rwSlice(%v)", reader.ino, offset, size, rs)
240
		}
241
		if endflag {
242
			break
243
		}
244
	}
245
	log.LogDebugf("TRACE blobStore prepareEbsSlice Exit. ino(%v)  offset(%v) size(%v) rwSlices(%v)", reader.ino, offset, size, chunks)
246
	return chunks, nil
247
}
248

249
func (reader *Reader) buildExtentKey(rs *rwSlice) {
250
	if len(reader.extentKeys) <= 0 {
251
		rs.extentKey = proto.ExtentKey{}
252
	} else {
253
		low := 0
254
		high := len(reader.extentKeys) - 1
255
		for low <= high {
256
			mid := (high + low) / 2
257
			target := reader.extentKeys[mid]
258
			if target.FileOffset == rs.objExtentKey.FileOffset {
259
				rs.extentKey = target
260
				return
261
			} else if target.FileOffset > rs.objExtentKey.FileOffset {
262
				high = mid - 1
263
			} else {
264
				low = mid + 1
265
			}
266
		}
267
		rs.extentKey = proto.ExtentKey{}
268
	}
269
}
270

271
func (reader *Reader) readSliceRange(ctx context.Context, rs *rwSlice) (err error) {
272
	defer reader.wg.Done()
273
	log.LogDebugf("TRACE blobStore readSliceRange Enter. ino(%v)  rs.fileOffset(%v),rs.rOffset(%v),rs.rSize(%v) ", reader.ino, rs.fileOffset, rs.rOffset, rs.rSize)
274
	cacheKey := util.GenerateKey(reader.volName, reader.ino, rs.fileOffset)
275
	log.LogDebugf("TRACE blobStore readSliceRange. ino(%v)  cacheKey(%v) ", reader.ino, cacheKey)
276
	buf := make([]byte, rs.rSize)
277
	var readN int
278

279
	bgTime := stat.BeginStat()
280
	stat.EndStat("CacheGet", nil, bgTime, 1)
281
	// all request for each block.
282
	metric := exporter.NewTPCnt("CacheGet")
283
	defer func() {
284
		metric.SetWithLabels(err, map[string]string{exporter.Vol: reader.volName})
285
	}()
286

287
	// read local cache
288
	if reader.enableBcache {
289
		readN, err = reader.bc.Get(cacheKey, buf, rs.rOffset, rs.rSize)
290
		if err == nil {
291
			reader.ec.BcacheHealth = true
292
			if readN == int(rs.rSize) {
293

294
				// L1 cache hit.
295
				metric := exporter.NewTPCnt("L1CacheGetHit")
296
				stat.EndStat("CacheHit-L1", nil, bgTime, 1)
297
				defer func() {
298
					metric.SetWithLabels(err, map[string]string{exporter.Vol: reader.volName})
299
				}()
300

301
				copy(rs.Data, buf)
302
				reader.err <- nil
303
				return
304
			}
305
		}
306
	}
307

308
	readLimitOn := false
309
	// read cfs and cache to bcache
310
	if rs.extentKey != (proto.ExtentKey{}) {
311

312
		// check if dp is exist in preload sence
313
		err = reader.ec.CheckDataPartitionExsit(rs.extentKey.PartitionId)
314
		if err == nil || ctx.Value("objectnode") != nil {
315
			readN, err, readLimitOn = reader.ec.ReadExtent(reader.ino, &rs.extentKey, buf, int(rs.rOffset), int(rs.rSize))
316
			if err == nil && readN == int(rs.rSize) {
317

318
				// L2 cache hit.
319
				metric := exporter.NewTPCnt("L2CacheGetHit")
320
				stat.EndStat("CacheHit-L2", nil, bgTime, 1)
321
				defer func() {
322
					metric.SetWithLabels(err, map[string]string{exporter.Vol: reader.volName})
323
				}()
324

325
				copy(rs.Data, buf)
326
				reader.err <- nil
327
				return
328
			}
329
		} else {
330
			log.LogDebugf("checkDataPartitionExsit failed (%v)", err)
331
		}
332
		log.LogDebugf("TRACE blobStore readSliceRange. cfs block miss.extentKey=%v,err=%v", rs.extentKey, err)
333
	}
334
	if !readLimitOn {
335
		reader.limitManager.ReadAlloc(ctx, int(rs.rSize))
336
	}
337

338
	readN, err = reader.ebs.Read(ctx, reader.volName, buf, rs.rOffset, uint64(rs.rSize), rs.objExtentKey)
339
	if err != nil {
340
		reader.err <- err
341
		return
342
	}
343
	read := copy(rs.Data, buf)
344
	reader.err <- nil
345

346
	// cache full block
347
	if !reader.needCacheL1() && !reader.needCacheL2() || reader.ec.IsPreloadMode() {
348
		log.LogDebugf("TRACE blobStore readSliceRange exit without cache. read counter=%v", read)
349
		return nil
350
	}
351

352
	asyncCtx := context.Background()
353
	go reader.asyncCache(asyncCtx, cacheKey, rs.objExtentKey)
354

355
	log.LogDebugf("TRACE blobStore readSliceRange exit with cache. read counter=%v", read)
356
	return nil
357
}
358

359
func (reader *Reader) asyncCache(ctx context.Context, cacheKey string, objExtentKey proto.ObjExtentKey) {
360
	var err error
361
	bgTime := stat.BeginStat()
362
	defer func() {
363
		stat.EndStat("read-async-cache", err, bgTime, 1)
364
	}()
365

366
	log.LogDebugf("TRACE blobStore asyncCache Enter. cacheKey=%v", cacheKey)
367

368
	// block is go loading.
369
	if _, ok := reader.inflightL2cache.Load(cacheKey); ok {
370
		return
371
	}
372

373
	reader.inflightL2cache.Store(cacheKey, true)
374
	defer reader.inflightL2cache.Delete(cacheKey)
375

376
	buf := make([]byte, objExtentKey.Size)
377
	read, err := reader.ebs.Read(ctx, reader.volName, buf, 0, uint64(len(buf)), objExtentKey)
378
	if err != nil || read != len(buf) {
379
		log.LogErrorf("ERROR blobStore asyncCache fail, size no match. cacheKey=%v, objExtentKey.size=%v, read=%v",
380
			cacheKey, len(buf), read)
381
		return
382
	}
383

384
	if reader.needCacheL2() {
385
		reader.ec.Write(reader.ino, int(objExtentKey.FileOffset), buf, proto.FlagsCache, nil)
386
		log.LogDebugf("TRACE blobStore asyncCache(L2) Exit. cacheKey=%v", cacheKey)
387
		return
388
	}
389

390
	if reader.needCacheL1() {
391
		reader.bc.Put(cacheKey, buf)
392
	}
393

394
	log.LogDebugf("TRACE blobStore asyncCache(L1) Exit. cacheKey=%v", cacheKey)
395
}
396

397
func (reader *Reader) needCacheL2() bool {
398
	if reader.cacheAction > proto.NoCache && reader.fileLength < uint64(reader.cacheThreshold) || reader.fileCache {
399
		return true
400
	}
401
	return false
402
}
403

404
func (reader *Reader) needCacheL1() bool {
405
	return reader.enableBcache
406
}
407

408
func (reader *Reader) refreshEbsExtents() {
409
	_, _, eks, oeks, err := reader.mw.GetObjExtents(reader.ino)
410
	if err != nil {
411
		reader.valid = false
412
		log.LogErrorf("TRACE blobStore refreshEbsExtents error. ino(%v)  err(%v) ", reader.ino, err)
413
		return
414
	}
415
	reader.valid = true
416
	reader.extentKeys = eks
417
	reader.objExtentKeys = oeks
418
	log.LogDebugf("TRACE blobStore refreshEbsExtents ok. extentKeys(%v)  objExtentKeys(%v) ", reader.extentKeys, reader.objExtentKeys)
419
}
420

421
func (reader *Reader) fileSize() (uint64, bool) {
422
	objKeys := reader.objExtentKeys
423
	if !reader.valid {
424
		return 0, false
425
	}
426
	if len(objKeys) > 0 {
427
		lastIndex := len(objKeys) - 1
428
		return objKeys[lastIndex].FileOffset + objKeys[lastIndex].Size, true
429
	}
430
	return 0, true
431
}
432

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.