27
"github.com/cubefs/cubefs/blockcache/bcache"
28
"github.com/cubefs/cubefs/proto"
29
"github.com/cubefs/cubefs/sdk/data/manager"
30
"github.com/cubefs/cubefs/sdk/data/stream"
31
"github.com/cubefs/cubefs/sdk/meta"
32
"github.com/cubefs/cubefs/util"
33
"github.com/cubefs/cubefs/util/buf"
34
"github.com/cubefs/cubefs/util/log"
35
"github.com/cubefs/cubefs/util/stat"
39
MaxBufferSize = 512 * util.MB
42
type wSliceErr struct {
54
bc *bcache.BcacheClient
56
ec *stream.ExtentClient
71
limitManager *manager.LimitManager
74
func NewWriter(config ClientConfig) (writer *Writer) {
77
writer.volName = config.VolName
78
writer.volType = config.VolType
79
writer.blockSize = config.BlockSize
80
writer.ino = config.Ino
85
writer.ebsc = config.Ebsc
86
writer.wConcurrency = config.WConcurrency
87
writer.wg = sync.WaitGroup{}
88
writer.once = sync.Once{}
89
writer.RWMutex = sync.RWMutex{}
90
writer.enableBcache = config.EnableBcache
91
writer.cacheAction = config.CacheAction
92
writer.fileCache = config.FileCache
93
writer.fileSize = config.FileSize
94
writer.cacheThreshold = config.CacheThreshold
96
writer.allocateCache()
97
writer.limitManager = writer.ec.LimitManager
102
func (writer *Writer) String() string {
103
return fmt.Sprintf("Writer{address(%v),volName(%v),volType(%v),ino(%v),blockSize(%v),fileSize(%v),enableBcache(%v),cacheAction(%v),fileCache(%v),cacheThreshold(%v)},wConcurrency(%v)",
104
&writer, writer.volName, writer.volType, writer.ino, writer.blockSize, writer.fileSize, writer.enableBcache, writer.cacheAction, writer.fileCache, writer.cacheThreshold, writer.wConcurrency)
107
func (writer *Writer) WriteWithoutPool(ctx context.Context, offset int, data []byte) (size int, err error) {
110
return 0, fmt.Errorf("writer is not opened yet")
112
log.LogDebugf("TRACE blobStore WriteWithoutPool Enter: ino(%v) offset(%v) len(%v) fileSize(%v)",
113
writer.ino, offset, len(data), writer.CacheFileSize())
115
if len(data) > MaxBufferSize || offset != writer.CacheFileSize() {
116
log.LogErrorf("TRACE blobStore WriteWithoutPool error,may be len(%v)>512MB,offset(%v)!=fileSize(%v)",
117
len(data), offset, writer.CacheFileSize())
118
err = syscall.EOPNOTSUPP
122
log.LogDebugf("TRACE blobStore WriteWithoutPool: ino(%v) offset(%v) len(%v)",
123
writer.ino, offset, len(data))
125
size, err = writer.doBufferWriteWithoutPool(ctx, data, offset)
130
func (writer *Writer) Write(ctx context.Context, offset int, data []byte, flags int) (size int, err error) {
133
return 0, fmt.Errorf("writer is not opened yet")
135
log.LogDebugf("TRACE blobStore Write Enter: ino(%v) offset(%v) len(%v) flags&proto.FlagsAppend(%v) fileSize(%v)", writer.ino, offset, len(data), flags&proto.FlagsAppend, writer.CacheFileSize())
137
if len(data) > MaxBufferSize || flags&proto.FlagsAppend == 0 || offset != writer.CacheFileSize() {
138
log.LogErrorf("TRACE blobStore Write error,may be len(%v)>512MB,flags(%v)!=flagAppend,offset(%v)!=fileSize(%v)", len(data), flags&proto.FlagsAppend, offset, writer.CacheFileSize())
139
err = syscall.EOPNOTSUPP
143
log.LogDebugf("TRACE blobStore Write: ino(%v) offset(%v) len(%v) flags&proto.FlagsSyncWrite(%v)", writer.ino, offset, len(data), flags&proto.FlagsSyncWrite)
144
if flags&proto.FlagsSyncWrite == 0 {
145
size, err = writer.doBufferWrite(ctx, data, offset)
149
size, err = writer.doParallelWrite(ctx, data, offset)
153
func (writer *Writer) doParallelWrite(ctx context.Context, data []byte, offset int) (size int, err error) {
154
log.LogDebugf("TRACE blobStore doDirectWrite: ino(%v) offset(%v) len(%v)", writer.ino, offset, len(data))
156
defer writer.Unlock()
157
wSlices := writer.prepareWriteSlice(offset, data)
158
log.LogDebugf("TRACE blobStore prepareWriteSlice: wSlices(%v)", wSlices)
159
sliceSize := len(wSlices)
161
writer.wg.Add(sliceSize)
162
writer.err = make(chan *wSliceErr, sliceSize)
163
pool := New(writer.wConcurrency, sliceSize)
165
for _, wSlice := range wSlices {
166
pool.Execute(wSlice, func(param *rwSlice) {
167
writer.writeSlice(ctx, param, true)
171
for i := 0; i < sliceSize; i++ {
172
if wErr := <-writer.err; wErr != nil {
173
log.LogErrorf("slice write error,ino(%v) fileoffset(%v) sliceSize(%v) err(%v)",
174
writer.ino, wErr.fileOffset, wErr.size, wErr.err)
180
oeks := make([]proto.ObjExtentKey, 0)
181
for _, wSlice := range wSlices {
182
size += int(wSlice.size)
183
oeks = append(oeks, wSlice.objExtentKey)
185
log.LogDebugf("TRACE blobStore appendObjExtentKeys: oeks(%v)", oeks)
186
if err = writer.mw.AppendObjExtentKeys(writer.ino, oeks); err != nil {
187
log.LogErrorf("slice write error,meta append ebsc extent keys fail,ino(%v) fileOffset(%v) len(%v) err(%v)", writer.ino, offset, len(data), err)
190
atomic.AddUint64(&writer.fileSize, uint64(size))
192
for _, wSlice := range wSlices {
193
writer.cacheLevel2(wSlice)
199
func (writer *Writer) cacheLevel2(wSlice *rwSlice) {
200
if writer.cacheAction == proto.RWCache && (wSlice.fileOffset+uint64(wSlice.size)) < uint64(writer.cacheThreshold) || writer.fileCache {
201
buf := make([]byte, wSlice.size)
202
offSet := int(wSlice.fileOffset)
203
copy(buf, wSlice.Data)
204
go writer.asyncCache(writer.ino, offSet, buf)
208
func (writer *Writer) WriteFromReader(ctx context.Context, reader io.Reader, h hash.Hash) (size uint64, err error) {
210
tmp = buf.ReadBufPool.Get().([]byte)
211
exec = NewExecutor(writer.wConcurrency)
214
defer buf.ReadBufPool.Put(tmp)
216
writer.fileOffset = 0
217
writer.err = make(chan *wSliceErr)
219
var oeksLock sync.RWMutex
220
oeks := make([]proto.ObjExtentKey, 0)
222
writeBuff := func() {
223
bufSize := len(writer.buf)
224
log.LogDebugf("writeBuff: bufSize(%v), leftToWrite(%v), err(%v)", bufSize, leftToWrite, err)
225
if bufSize == writer.blockSize || (leftToWrite == 0 && err == io.EOF) {
227
fileOffset: uint64(writer.fileOffset - bufSize),
228
size: uint32(bufSize),
230
wSlice.Data = make([]byte, bufSize)
231
copy(wSlice.Data, writer.buf)
232
writer.buf = writer.buf[:0]
233
if (err == nil || err == io.EOF) && h != nil {
235
log.LogDebugf("writeBuff: bufSize(%v), md5", bufSize)
240
defer writer.wg.Done()
241
err := writer.writeSlice(ctx, wSlice, false)
244
if len(writer.err) > 0 {
250
fileOffset: wSlice.fileOffset,
259
oeks = append(oeks, wSlice.objExtentKey)
262
writer.cacheLevel2(wSlice)
272
leftToWrite, err = reader.Read(tmp)
273
if err != nil && err != io.EOF {
277
for leftToWrite > 0 {
278
log.LogDebugf("WriteFromReader: leftToWrite(%v), err(%v)", leftToWrite, err)
280
errNum := len(writer.err)
286
freeSize := writer.blockSize - len(writer.buf)
287
writeSize := util.Min(leftToWrite, freeSize)
288
writer.buf = append(writer.buf, tmp[position:position+writeSize]...)
289
position += writeSize
290
leftToWrite -= writeSize
291
writer.fileOffset += writeSize
298
log.LogDebugf("WriteFromReader: EOF")
299
if len(writer.buf) > 0 {
306
case wErr := <-writer.err:
311
log.LogErrorf("slice write error,ino(%v) fileoffset(%v) sliceSize(%v) err(%v)", writer.ino, wErr.fileOffset, wErr.size, err)
317
log.LogDebugf("WriteFromReader before sort: %v", oeks)
318
sort.Slice(oeks, func(i, j int) bool {
319
return oeks[i].FileOffset < oeks[j].FileOffset
321
log.LogDebugf("WriteFromReader after sort: %v", oeks)
322
if err = writer.mw.AppendObjExtentKeys(writer.ino, oeks); err != nil {
323
log.LogErrorf("WriteFromReader error,meta append ebsc extent keys fail,ino(%v), err(%v)", writer.ino, err)
327
size = uint64(writer.fileOffset)
328
atomic.AddUint64(&writer.fileSize, size)
332
func (writer *Writer) doBufferWriteWithoutPool(ctx context.Context, data []byte, offset int) (size int, err error) {
333
log.LogDebugf("TRACE blobStore doBufferWriteWithoutPool Enter: ino(%v) offset(%v) len(%v)", writer.ino, offset, len(data))
335
writer.fileOffset = offset
336
dataSize := len(data)
338
log.LogDebugf("TRACE blobStore doBufferWriteWithoutPool: ino(%v) writer.buf.len(%v) writer.blocksize(%v)", writer.ino, len(writer.buf), writer.blockSize)
340
defer writer.Unlock()
342
freeSize := writer.blockSize - len(writer.buf)
343
if dataSize < freeSize {
346
log.LogDebugf("TRACE blobStore doBufferWriteWithoutPool: ino(%v) writer.fileSize(%v) writer.fileOffset(%v) position(%v) freeSize(%v)", writer.ino, writer.fileSize, writer.fileOffset, position, freeSize)
347
writer.buf = append(writer.buf, data[position:position+freeSize]...)
348
log.LogDebugf("TRACE blobStore doBufferWriteWithoutPool:ino(%v) writer.buf.len(%v)", writer.ino, len(writer.buf))
351
writer.fileOffset += freeSize
354
if len(writer.buf) == writer.blockSize {
355
log.LogDebugf("TRACE blobStore doBufferWriteWithoutPool: ino(%v) writer.buf.len(%v) writer.blocksize(%v)", writer.ino, len(writer.buf), writer.blockSize)
357
err = writer.flushWithoutPool(writer.ino, ctx, false)
360
writer.buf = writer.buf[:len(writer.buf)-len(data)]
361
writer.fileOffset -= len(data)
369
atomic.AddUint64(&writer.fileSize, uint64(size))
371
log.LogDebugf("TRACE blobStore doBufferWriteWithoutPool Exit: ino(%v) writer.fileSize(%v) writer.fileOffset(%v)", writer.ino, writer.fileSize, writer.fileOffset)
375
func (writer *Writer) doBufferWrite(ctx context.Context, data []byte, offset int) (size int, err error) {
376
log.LogDebugf("TRACE blobStore doBufferWrite Enter: ino(%v) offset(%v) len(%v)", writer.ino, offset, len(data))
378
writer.fileOffset = offset
379
dataSize := len(data)
381
log.LogDebugf("TRACE blobStore doBufferWrite: ino(%v) writer.buf.len(%v) writer.blocksize(%v)", writer.ino, len(writer.buf), writer.blockSize)
383
defer writer.Unlock()
385
freeSize := writer.blockSize - writer.blockPosition
386
if dataSize < freeSize {
389
log.LogDebugf("TRACE blobStore doBufferWrite: ino(%v) writer.fileSize(%v) writer.fileOffset(%v) writer.blockPosition(%v) position(%v) freeSize(%v)", writer.ino, writer.fileSize, writer.fileOffset, writer.blockPosition, position, freeSize)
390
copy(writer.buf[writer.blockPosition:], data[position:position+freeSize])
391
log.LogDebugf("TRACE blobStore doBufferWrite:ino(%v) writer.buf.len(%v)", writer.ino, len(writer.buf))
393
writer.blockPosition += freeSize
395
writer.fileOffset += freeSize
398
if writer.blockPosition == writer.blockSize {
399
log.LogDebugf("TRACE blobStore doBufferWrite: ino(%v) writer.buf.len(%v) writer.blocksize(%v)", writer.ino, len(writer.buf), writer.blockSize)
401
err = writer.flush(writer.ino, ctx, false)
404
writer.buf = writer.buf[:writer.blockPosition-freeSize]
405
writer.fileOffset -= freeSize
406
writer.blockPosition -= freeSize
413
atomic.AddUint64(&writer.fileSize, uint64(size))
415
log.LogDebugf("TRACE blobStore doBufferWrite Exit: ino(%v) writer.fileSize(%v) writer.fileOffset(%v)", writer.ino, writer.fileSize, writer.fileOffset)
419
func (writer *Writer) FlushWithoutPool(ino uint64, ctx context.Context) (err error) {
423
return writer.flushWithoutPool(ino, ctx, true)
426
func (writer *Writer) Flush(ino uint64, ctx context.Context) (err error) {
430
return writer.flush(ino, ctx, true)
433
func (writer *Writer) shouldCacheCfs() bool {
434
return writer.cacheAction == proto.RWCache
437
func (writer *Writer) prepareWriteSlice(offset int, data []byte) []*rwSlice {
439
wSlices := make([]*rwSlice, 0)
440
wSliceCount := size / writer.blockSize
441
remainSize := size % writer.blockSize
442
for index := 0; index < wSliceCount; index++ {
443
offset := offset + index*writer.blockSize
446
fileOffset: uint64(offset),
447
size: uint32(writer.blockSize),
448
Data: data[index*writer.blockSize : (index+1)*writer.blockSize],
450
wSlices = append(wSlices, wSlice)
452
offset = offset + wSliceCount*writer.blockSize
456
fileOffset: uint64(offset),
457
size: uint32(remainSize),
458
Data: data[wSliceCount*writer.blockSize:],
460
wSlices = append(wSlices, wSlice)
466
func (writer *Writer) writeSlice(ctx context.Context, wSlice *rwSlice, wg bool) (err error) {
468
defer writer.wg.Done()
470
writer.limitManager.WriteAlloc(ctx, int(wSlice.size))
471
log.LogDebugf("TRACE blobStore,writeSlice to ebs. ino(%v) fileOffset(%v) len(%v)", writer.ino, wSlice.fileOffset, wSlice.size)
472
location, err := writer.ebsc.Write(ctx, writer.volName, wSlice.Data, wSlice.size)
475
writer.err <- &wSliceErr{err: err, fileOffset: wSlice.fileOffset, size: wSlice.size}
479
log.LogDebugf("TRACE blobStore,location(%v)", location)
480
blobs := make([]proto.Blob, 0)
481
for _, info := range location.Blobs {
483
MinBid: uint64(info.MinBid),
484
Count: uint64(info.Count),
485
Vid: uint64(info.Vid),
487
blobs = append(blobs, blob)
489
wSlice.objExtentKey = proto.ObjExtentKey{
490
Cid: uint64(location.ClusterID),
491
CodeMode: uint8(location.CodeMode),
493
BlobSize: location.BlobSize,
495
BlobsLen: uint32(len(blobs)),
496
FileOffset: wSlice.fileOffset,
499
log.LogDebugf("TRACE blobStore,objExtentKey(%v)", wSlice.objExtentKey)
507
func (writer *Writer) asyncCache(ino uint64, offset int, data []byte) {
509
bgTime := stat.BeginStat()
511
stat.EndStat("write-async-cache", err, bgTime, 1)
514
log.LogDebugf("TRACE asyncCache Enter,fileOffset(%v) len(%v)", offset, len(data))
515
write, err := writer.ec.Write(ino, offset, data, proto.FlagsCache, nil)
516
log.LogDebugf("TRACE asyncCache Exit,write(%v) err(%v)", write, err)
519
func (writer *Writer) resetBufferWithoutPool() {
520
writer.buf = writer.buf[:0]
523
func (writer *Writer) resetBuffer() {
525
writer.blockPosition = 0
528
func (writer *Writer) flushWithoutPool(inode uint64, ctx context.Context, flushFlag bool) (err error) {
529
bgTime := stat.BeginStat()
531
stat.EndStat("blobstore-flush", err, bgTime, 1)
534
log.LogDebugf("TRACE blobStore flushWithoutPool: ino(%v) buf-len(%v) flushFlag(%v)", inode, len(writer.buf), flushFlag)
541
if len(writer.buf) == 0 || !writer.dirty {
544
bufferSize := len(writer.buf)
546
fileOffset: uint64(writer.fileOffset - bufferSize),
547
size: uint32(bufferSize),
550
err = writer.writeSlice(ctx, wSlice, false)
553
atomic.AddUint64(&writer.fileSize, -uint64(bufferSize))
558
oeks := make([]proto.ObjExtentKey, 0)
560
oeks = append(oeks, wSlice.objExtentKey)
561
if err = writer.mw.AppendObjExtentKeys(writer.ino, oeks); err != nil {
562
log.LogErrorf("slice write error,meta append ebsc extent keys fail,ino(%v) fileOffset(%v) len(%v) err(%v)", inode, wSlice.fileOffset, wSlice.size, err)
565
writer.resetBufferWithoutPool()
567
writer.cacheLevel2(wSlice)
571
func (writer *Writer) flush(inode uint64, ctx context.Context, flushFlag bool) (err error) {
572
bgTime := stat.BeginStat()
574
stat.EndStat("blobstore-flush", err, bgTime, 1)
577
log.LogDebugf("TRACE blobStore flush: ino(%v) buf-len(%v) flushFlag(%v)", inode, len(writer.buf), flushFlag)
584
if len(writer.buf) == 0 || !writer.dirty {
587
bufferSize := writer.blockPosition
589
fileOffset: uint64(writer.fileOffset - bufferSize),
590
size: uint32(bufferSize),
593
err = writer.writeSlice(ctx, wSlice, false)
596
atomic.AddUint64(&writer.fileSize, -uint64(bufferSize))
601
oeks := make([]proto.ObjExtentKey, 0)
603
oeks = append(oeks, wSlice.objExtentKey)
604
if err = writer.mw.AppendObjExtentKeys(writer.ino, oeks); err != nil {
605
log.LogErrorf("slice write error,meta append ebsc extent keys fail,ino(%v) fileOffset(%v) len(%v) err(%v)", inode, wSlice.fileOffset, wSlice.size, err)
610
writer.cacheLevel2(wSlice)
614
func (writer *Writer) CacheFileSize() int {
615
return int(atomic.LoadUint64(&writer.fileSize))
618
func (writer *Writer) FreeCache() {
622
if buf.CachePool == nil {
625
writer.once.Do(func() {
629
buf.CachePool.Put(tmpBuf)
634
func (writer *Writer) allocateCache() {
635
if buf.CachePool == nil {
638
writer.buf = buf.CachePool.Get()