26
"github.com/cubefs/cubefs/blockcache/bcache"
27
"github.com/cubefs/cubefs/proto"
28
"github.com/cubefs/cubefs/sdk/data/manager"
29
"github.com/cubefs/cubefs/sdk/data/stream"
30
"github.com/cubefs/cubefs/sdk/meta"
31
"github.com/cubefs/cubefs/util"
32
"github.com/cubefs/cubefs/util/exporter"
33
"github.com/cubefs/cubefs/util/log"
34
"github.com/cubefs/cubefs/util/stat"
45
extentKey proto.ExtentKey
46
objExtentKey proto.ObjExtentKey
49
func (s rwSlice) String() string {
50
return fmt.Sprintf("rwSlice{fileOffset(%v),size(%v),rOffset(%v),rSize(%v),read(%v),extentKey(%v),objExtentKey(%v)}", s.fileOffset, s.size, s.rOffset, s.rSize, s.read, s.extentKey, s.objExtentKey)
53
func (reader *Reader) String() string {
54
return fmt.Sprintf("Reader{address(%v),volName(%v),volType(%v),ino(%v),fileSize(%v),enableBcache(%v),cacheAction(%v),fileCache(%v),cacheThreshold(%v)},readConcurrency(%v)",
55
&reader, reader.volName, reader.volType, reader.ino, reader.fileLength, reader.enableBcache, reader.cacheAction, reader.fileCache, reader.cacheThreshold, reader.readConcurrency)
65
bc *bcache.BcacheClient
67
ec *stream.ExtentClient
70
cacheTimeout time.Duration
75
extentKeys []proto.ExtentKey
76
missExtentKeys []proto.ExtentKey
77
objExtentKeys []proto.ObjExtentKey
84
inflightL2cache sync.Map
85
limitManager *manager.LimitManager
88
type ClientConfig struct {
93
Bc *bcache.BcacheClient
95
Ec *stream.ExtentClient
106
func NewReader(config ClientConfig) (reader *Reader) {
109
reader.volName = config.VolName
110
reader.volType = config.VolType
111
reader.ino = config.Ino
112
reader.bc = config.Bc
113
reader.ebs = config.Ebsc
114
reader.mw = config.Mw
115
reader.ec = config.Ec
116
reader.enableBcache = config.EnableBcache
117
reader.readConcurrency = config.ReadConcurrency
118
reader.cacheAction = config.CacheAction
119
reader.fileCache = config.FileCache
120
reader.cacheThreshold = config.CacheThreshold
122
if proto.IsCold(reader.volType) {
123
reader.ec.UpdateDataPartitionForColdVolume()
126
reader.limitManager = reader.ec.LimitManager
130
func (reader *Reader) Read(ctx context.Context, buf []byte, offset int, size int) (int, error) {
132
return 0, fmt.Errorf("reader is not opened yet")
134
log.LogDebugf("TRACE reader Read Enter. ino(%v) offset(%v) len(%v)", reader.ino, offset, size)
140
return 0, os.ErrInvalid
144
defer reader.Unlock()
146
var rSlices []*rwSlice
147
if size != len(buf) {
151
rSlices, err = reader.prepareEbsSlice(offset, uint32(size))
152
log.LogDebugf("TRACE reader Read. ino(%v) rSlices-length(%v) ", reader.ino, len(rSlices))
157
sliceSize := len(rSlices)
159
reader.wg.Add(sliceSize)
160
pool := New(reader.readConcurrency, sliceSize)
162
reader.err = make(chan error, sliceSize)
163
for _, rs := range rSlices {
164
pool.Execute(rs, func(param *rwSlice) {
165
reader.readSliceRange(ctx, param)
170
for i := 0; i < sliceSize; i++ {
171
if err, ok := <-reader.err; !ok || err != nil {
177
for i := 0; i < sliceSize; i++ {
178
read += copy(buf[read:], rSlices[i].Data)
180
log.LogDebugf("TRACE reader Read Exit. ino(%v) readN(%v) buf-len(%v)", reader.ino, read, len(buf))
184
func (reader *Reader) Close(ctx context.Context) {
190
func (reader *Reader) prepareEbsSlice(offset int, size uint32) ([]*rwSlice, error) {
192
return nil, syscall.EIO
194
chunks := make([]*rwSlice, 0)
198
reader.once.Do(func() {
199
reader.refreshEbsExtents()
201
fileSize, valid := reader.fileSize()
202
reader.fileLength = fileSize
203
log.LogDebugf("TRACE blobStore prepareEbsSlice Enter. ino(%v) fileSize(%v) ", reader.ino, fileSize)
205
log.LogErrorf("Reader: invoke fileSize fail. ino(%v) offset(%v) size(%v)", reader.ino, offset, size)
206
return nil, syscall.EIO
208
log.LogDebugf("TRACE blobStore prepareEbsSlice. ino(%v) offset(%v) size(%v)", reader.ino, offset, size)
209
if uint64(offset) >= fileSize {
213
start := uint64(offset)
214
if uint64(offset)+uint64(size) > fileSize {
215
size = uint32(fileSize - uint64(offset))
217
end := uint64(offset + int(size))
218
for index, oek := range reader.objExtentKeys {
220
if oek.FileOffset <= start && start < oek.FileOffset+(oek.Size) {
222
rs.fileOffset = oek.FileOffset
223
rs.size = uint32(oek.Size)
224
rs.rOffset = start - oek.FileOffset
225
rs.rSize = uint32(oek.FileOffset + oek.Size - start)
228
if end <= oek.FileOffset+oek.Size {
229
rs.rSize = uint32(end - start)
234
rs.objExtentKey = oek
235
reader.buildExtentKey(rs)
236
rs.Data = make([]byte, rs.rSize)
237
start = oek.FileOffset + oek.Size
238
chunks = append(chunks, rs)
239
log.LogDebugf("TRACE blobStore prepareEbsSlice. ino(%v) offset(%v) size(%v) rwSlice(%v)", reader.ino, offset, size, rs)
245
log.LogDebugf("TRACE blobStore prepareEbsSlice Exit. ino(%v) offset(%v) size(%v) rwSlices(%v)", reader.ino, offset, size, chunks)
249
func (reader *Reader) buildExtentKey(rs *rwSlice) {
250
if len(reader.extentKeys) <= 0 {
251
rs.extentKey = proto.ExtentKey{}
254
high := len(reader.extentKeys) - 1
256
mid := (high + low) / 2
257
target := reader.extentKeys[mid]
258
if target.FileOffset == rs.objExtentKey.FileOffset {
259
rs.extentKey = target
261
} else if target.FileOffset > rs.objExtentKey.FileOffset {
267
rs.extentKey = proto.ExtentKey{}
271
func (reader *Reader) readSliceRange(ctx context.Context, rs *rwSlice) (err error) {
272
defer reader.wg.Done()
273
log.LogDebugf("TRACE blobStore readSliceRange Enter. ino(%v) rs.fileOffset(%v),rs.rOffset(%v),rs.rSize(%v) ", reader.ino, rs.fileOffset, rs.rOffset, rs.rSize)
274
cacheKey := util.GenerateKey(reader.volName, reader.ino, rs.fileOffset)
275
log.LogDebugf("TRACE blobStore readSliceRange. ino(%v) cacheKey(%v) ", reader.ino, cacheKey)
276
buf := make([]byte, rs.rSize)
279
bgTime := stat.BeginStat()
280
stat.EndStat("CacheGet", nil, bgTime, 1)
282
metric := exporter.NewTPCnt("CacheGet")
284
metric.SetWithLabels(err, map[string]string{exporter.Vol: reader.volName})
288
if reader.enableBcache {
289
readN, err = reader.bc.Get(cacheKey, buf, rs.rOffset, rs.rSize)
291
reader.ec.BcacheHealth = true
292
if readN == int(rs.rSize) {
295
metric := exporter.NewTPCnt("L1CacheGetHit")
296
stat.EndStat("CacheHit-L1", nil, bgTime, 1)
298
metric.SetWithLabels(err, map[string]string{exporter.Vol: reader.volName})
310
if rs.extentKey != (proto.ExtentKey{}) {
313
err = reader.ec.CheckDataPartitionExsit(rs.extentKey.PartitionId)
314
if err == nil || ctx.Value("objectnode") != nil {
315
readN, err, readLimitOn = reader.ec.ReadExtent(reader.ino, &rs.extentKey, buf, int(rs.rOffset), int(rs.rSize))
316
if err == nil && readN == int(rs.rSize) {
319
metric := exporter.NewTPCnt("L2CacheGetHit")
320
stat.EndStat("CacheHit-L2", nil, bgTime, 1)
322
metric.SetWithLabels(err, map[string]string{exporter.Vol: reader.volName})
330
log.LogDebugf("checkDataPartitionExsit failed (%v)", err)
332
log.LogDebugf("TRACE blobStore readSliceRange. cfs block miss.extentKey=%v,err=%v", rs.extentKey, err)
335
reader.limitManager.ReadAlloc(ctx, int(rs.rSize))
338
readN, err = reader.ebs.Read(ctx, reader.volName, buf, rs.rOffset, uint64(rs.rSize), rs.objExtentKey)
343
read := copy(rs.Data, buf)
347
if !reader.needCacheL1() && !reader.needCacheL2() || reader.ec.IsPreloadMode() {
348
log.LogDebugf("TRACE blobStore readSliceRange exit without cache. read counter=%v", read)
352
asyncCtx := context.Background()
353
go reader.asyncCache(asyncCtx, cacheKey, rs.objExtentKey)
355
log.LogDebugf("TRACE blobStore readSliceRange exit with cache. read counter=%v", read)
359
func (reader *Reader) asyncCache(ctx context.Context, cacheKey string, objExtentKey proto.ObjExtentKey) {
361
bgTime := stat.BeginStat()
363
stat.EndStat("read-async-cache", err, bgTime, 1)
366
log.LogDebugf("TRACE blobStore asyncCache Enter. cacheKey=%v", cacheKey)
369
if _, ok := reader.inflightL2cache.Load(cacheKey); ok {
373
reader.inflightL2cache.Store(cacheKey, true)
374
defer reader.inflightL2cache.Delete(cacheKey)
376
buf := make([]byte, objExtentKey.Size)
377
read, err := reader.ebs.Read(ctx, reader.volName, buf, 0, uint64(len(buf)), objExtentKey)
378
if err != nil || read != len(buf) {
379
log.LogErrorf("ERROR blobStore asyncCache fail, size no match. cacheKey=%v, objExtentKey.size=%v, read=%v",
380
cacheKey, len(buf), read)
384
if reader.needCacheL2() {
385
reader.ec.Write(reader.ino, int(objExtentKey.FileOffset), buf, proto.FlagsCache, nil)
386
log.LogDebugf("TRACE blobStore asyncCache(L2) Exit. cacheKey=%v", cacheKey)
390
if reader.needCacheL1() {
391
reader.bc.Put(cacheKey, buf)
394
log.LogDebugf("TRACE blobStore asyncCache(L1) Exit. cacheKey=%v", cacheKey)
397
func (reader *Reader) needCacheL2() bool {
398
if reader.cacheAction > proto.NoCache && reader.fileLength < uint64(reader.cacheThreshold) || reader.fileCache {
404
func (reader *Reader) needCacheL1() bool {
405
return reader.enableBcache
408
func (reader *Reader) refreshEbsExtents() {
409
_, _, eks, oeks, err := reader.mw.GetObjExtents(reader.ino)
412
log.LogErrorf("TRACE blobStore refreshEbsExtents error. ino(%v) err(%v) ", reader.ino, err)
416
reader.extentKeys = eks
417
reader.objExtentKeys = oeks
418
log.LogDebugf("TRACE blobStore refreshEbsExtents ok. extentKeys(%v) objExtentKeys(%v) ", reader.extentKeys, reader.objExtentKeys)
421
func (reader *Reader) fileSize() (uint64, bool) {
422
objKeys := reader.objExtentKeys
426
if len(objKeys) > 0 {
427
lastIndex := len(objKeys) - 1
428
return objKeys[lastIndex].FileOffset + objKeys[lastIndex].Size, true