cubefs

Форк
0
/
writer.go 
639 строк · 18.9 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package blobstore
16

17
import (
18
	"context"
19
	"fmt"
20
	"hash"
21
	"io"
22
	"sort"
23
	"sync"
24
	"sync/atomic"
25
	"syscall"
26

27
	"github.com/cubefs/cubefs/blockcache/bcache"
28
	"github.com/cubefs/cubefs/proto"
29
	"github.com/cubefs/cubefs/sdk/data/manager"
30
	"github.com/cubefs/cubefs/sdk/data/stream"
31
	"github.com/cubefs/cubefs/sdk/meta"
32
	"github.com/cubefs/cubefs/util"
33
	"github.com/cubefs/cubefs/util/buf"
34
	"github.com/cubefs/cubefs/util/log"
35
	"github.com/cubefs/cubefs/util/stat"
36
)
37

38
const (
39
	MaxBufferSize = 512 * util.MB
40
)
41

42
type wSliceErr struct {
43
	err        error
44
	fileOffset uint64
45
	size       uint32
46
}
47

48
type Writer struct {
49
	volType      int
50
	volName      string
51
	blockSize    int
52
	ino          uint64
53
	err          chan *wSliceErr
54
	bc           *bcache.BcacheClient
55
	mw           *meta.MetaWrapper
56
	ec           *stream.ExtentClient
57
	ebsc         *BlobStoreClient
58
	wConcurrency int
59
	wg           sync.WaitGroup
60
	once         sync.Once
61
	sync.RWMutex
62
	enableBcache   bool
63
	cacheAction    int
64
	buf            []byte
65
	fileOffset     int
66
	fileCache      bool
67
	fileSize       uint64
68
	cacheThreshold int
69
	dirty          bool
70
	blockPosition  int
71
	limitManager   *manager.LimitManager
72
}
73

74
func NewWriter(config ClientConfig) (writer *Writer) {
75
	writer = new(Writer)
76

77
	writer.volName = config.VolName
78
	writer.volType = config.VolType
79
	writer.blockSize = config.BlockSize
80
	writer.ino = config.Ino
81
	writer.err = nil
82
	writer.bc = config.Bc
83
	writer.mw = config.Mw
84
	writer.ec = config.Ec
85
	writer.ebsc = config.Ebsc
86
	writer.wConcurrency = config.WConcurrency
87
	writer.wg = sync.WaitGroup{}
88
	writer.once = sync.Once{}
89
	writer.RWMutex = sync.RWMutex{}
90
	writer.enableBcache = config.EnableBcache
91
	writer.cacheAction = config.CacheAction
92
	writer.fileCache = config.FileCache
93
	writer.fileSize = config.FileSize
94
	writer.cacheThreshold = config.CacheThreshold
95
	writer.dirty = false
96
	writer.allocateCache()
97
	writer.limitManager = writer.ec.LimitManager
98

99
	return
100
}
101

102
func (writer *Writer) String() string {
103
	return fmt.Sprintf("Writer{address(%v),volName(%v),volType(%v),ino(%v),blockSize(%v),fileSize(%v),enableBcache(%v),cacheAction(%v),fileCache(%v),cacheThreshold(%v)},wConcurrency(%v)",
104
		&writer, writer.volName, writer.volType, writer.ino, writer.blockSize, writer.fileSize, writer.enableBcache, writer.cacheAction, writer.fileCache, writer.cacheThreshold, writer.wConcurrency)
105
}
106

107
func (writer *Writer) WriteWithoutPool(ctx context.Context, offset int, data []byte) (size int, err error) {
108
	// atomic.StoreInt32(&writer.idle, 0)
109
	if writer == nil {
110
		return 0, fmt.Errorf("writer is not opened yet")
111
	}
112
	log.LogDebugf("TRACE blobStore WriteWithoutPool Enter: ino(%v) offset(%v) len(%v) fileSize(%v)",
113
		writer.ino, offset, len(data), writer.CacheFileSize())
114

115
	if len(data) > MaxBufferSize || offset != writer.CacheFileSize() {
116
		log.LogErrorf("TRACE blobStore WriteWithoutPool error,may be len(%v)>512MB,offset(%v)!=fileSize(%v)",
117
			len(data), offset, writer.CacheFileSize())
118
		err = syscall.EOPNOTSUPP
119
		return
120
	}
121
	// write buffer
122
	log.LogDebugf("TRACE blobStore WriteWithoutPool: ino(%v) offset(%v) len(%v)",
123
		writer.ino, offset, len(data))
124

125
	size, err = writer.doBufferWriteWithoutPool(ctx, data, offset)
126

127
	return
128
}
129

130
func (writer *Writer) Write(ctx context.Context, offset int, data []byte, flags int) (size int, err error) {
131
	// atomic.StoreInt32(&writer.idle, 0)
132
	if writer == nil {
133
		return 0, fmt.Errorf("writer is not opened yet")
134
	}
135
	log.LogDebugf("TRACE blobStore Write Enter: ino(%v) offset(%v) len(%v) flags&proto.FlagsAppend(%v) fileSize(%v)", writer.ino, offset, len(data), flags&proto.FlagsAppend, writer.CacheFileSize())
136

137
	if len(data) > MaxBufferSize || flags&proto.FlagsAppend == 0 || offset != writer.CacheFileSize() {
138
		log.LogErrorf("TRACE blobStore Write error,may be len(%v)>512MB,flags(%v)!=flagAppend,offset(%v)!=fileSize(%v)", len(data), flags&proto.FlagsAppend, offset, writer.CacheFileSize())
139
		err = syscall.EOPNOTSUPP
140
		return
141
	}
142
	// write buffer
143
	log.LogDebugf("TRACE blobStore Write: ino(%v) offset(%v) len(%v) flags&proto.FlagsSyncWrite(%v)", writer.ino, offset, len(data), flags&proto.FlagsSyncWrite)
144
	if flags&proto.FlagsSyncWrite == 0 {
145
		size, err = writer.doBufferWrite(ctx, data, offset)
146
		return
147
	}
148
	// parallel io write ebs direct
149
	size, err = writer.doParallelWrite(ctx, data, offset)
150
	return
151
}
152

153
func (writer *Writer) doParallelWrite(ctx context.Context, data []byte, offset int) (size int, err error) {
154
	log.LogDebugf("TRACE blobStore doDirectWrite: ino(%v) offset(%v) len(%v)", writer.ino, offset, len(data))
155
	writer.Lock()
156
	defer writer.Unlock()
157
	wSlices := writer.prepareWriteSlice(offset, data)
158
	log.LogDebugf("TRACE blobStore prepareWriteSlice: wSlices(%v)", wSlices)
159
	sliceSize := len(wSlices)
160

161
	writer.wg.Add(sliceSize)
162
	writer.err = make(chan *wSliceErr, sliceSize)
163
	pool := New(writer.wConcurrency, sliceSize)
164
	defer pool.Close()
165
	for _, wSlice := range wSlices {
166
		pool.Execute(wSlice, func(param *rwSlice) {
167
			writer.writeSlice(ctx, param, true)
168
		})
169
	}
170
	writer.wg.Wait()
171
	for i := 0; i < sliceSize; i++ {
172
		if wErr := <-writer.err; wErr != nil {
173
			log.LogErrorf("slice write error,ino(%v) fileoffset(%v) sliceSize(%v) err(%v)",
174
				writer.ino, wErr.fileOffset, wErr.size, wErr.err)
175
			return 0, wErr.err
176
		}
177
	}
178
	close(writer.err)
179
	// update meta
180
	oeks := make([]proto.ObjExtentKey, 0)
181
	for _, wSlice := range wSlices {
182
		size += int(wSlice.size)
183
		oeks = append(oeks, wSlice.objExtentKey)
184
	}
185
	log.LogDebugf("TRACE blobStore appendObjExtentKeys: oeks(%v)", oeks)
186
	if err = writer.mw.AppendObjExtentKeys(writer.ino, oeks); err != nil {
187
		log.LogErrorf("slice write error,meta append ebsc extent keys fail,ino(%v) fileOffset(%v) len(%v) err(%v)", writer.ino, offset, len(data), err)
188
		return
189
	}
190
	atomic.AddUint64(&writer.fileSize, uint64(size))
191

192
	for _, wSlice := range wSlices {
193
		writer.cacheLevel2(wSlice)
194
	}
195

196
	return
197
}
198

199
func (writer *Writer) cacheLevel2(wSlice *rwSlice) {
200
	if writer.cacheAction == proto.RWCache && (wSlice.fileOffset+uint64(wSlice.size)) < uint64(writer.cacheThreshold) || writer.fileCache {
201
		buf := make([]byte, wSlice.size)
202
		offSet := int(wSlice.fileOffset)
203
		copy(buf, wSlice.Data)
204
		go writer.asyncCache(writer.ino, offSet, buf)
205
	}
206
}
207

208
func (writer *Writer) WriteFromReader(ctx context.Context, reader io.Reader, h hash.Hash) (size uint64, err error) {
209
	var (
210
		tmp         = buf.ReadBufPool.Get().([]byte)
211
		exec        = NewExecutor(writer.wConcurrency)
212
		leftToWrite int
213
	)
214
	defer buf.ReadBufPool.Put(tmp)
215

216
	writer.fileOffset = 0
217
	writer.err = make(chan *wSliceErr)
218

219
	var oeksLock sync.RWMutex
220
	oeks := make([]proto.ObjExtentKey, 0)
221

222
	writeBuff := func() {
223
		bufSize := len(writer.buf)
224
		log.LogDebugf("writeBuff: bufSize(%v), leftToWrite(%v), err(%v)", bufSize, leftToWrite, err)
225
		if bufSize == writer.blockSize || (leftToWrite == 0 && err == io.EOF) {
226
			wSlice := &rwSlice{
227
				fileOffset: uint64(writer.fileOffset - bufSize),
228
				size:       uint32(bufSize),
229
			}
230
			wSlice.Data = make([]byte, bufSize)
231
			copy(wSlice.Data, writer.buf)
232
			writer.buf = writer.buf[:0]
233
			if (err == nil || err == io.EOF) && h != nil {
234
				h.Write(wSlice.Data)
235
				log.LogDebugf("writeBuff: bufSize(%v), md5", bufSize)
236
			}
237
			writer.wg.Add(1)
238

239
			write := func() {
240
				defer writer.wg.Done()
241
				err := writer.writeSlice(ctx, wSlice, false)
242
				if err != nil {
243
					writer.Lock()
244
					if len(writer.err) > 0 {
245
						writer.Unlock()
246
						return
247
					}
248
					wErr := &wSliceErr{
249
						err:        err,
250
						fileOffset: wSlice.fileOffset,
251
						size:       wSlice.size,
252
					}
253
					writer.err <- wErr
254
					writer.Unlock()
255
					return
256
				}
257

258
				oeksLock.Lock()
259
				oeks = append(oeks, wSlice.objExtentKey)
260
				oeksLock.Unlock()
261

262
				writer.cacheLevel2(wSlice)
263
			}
264

265
			exec.Run(write)
266
		}
267
	}
268

269
LOOP:
270
	for {
271
		position := 0
272
		leftToWrite, err = reader.Read(tmp)
273
		if err != nil && err != io.EOF {
274
			return
275
		}
276

277
		for leftToWrite > 0 {
278
			log.LogDebugf("WriteFromReader: leftToWrite(%v), err(%v)", leftToWrite, err)
279
			writer.RLock()
280
			errNum := len(writer.err)
281
			writer.RUnlock()
282
			if errNum > 0 {
283
				break LOOP
284
			}
285

286
			freeSize := writer.blockSize - len(writer.buf)
287
			writeSize := util.Min(leftToWrite, freeSize)
288
			writer.buf = append(writer.buf, tmp[position:position+writeSize]...)
289
			position += writeSize
290
			leftToWrite -= writeSize
291
			writer.fileOffset += writeSize
292
			writer.dirty = true
293

294
			writeBuff()
295

296
		}
297
		if err == io.EOF {
298
			log.LogDebugf("WriteFromReader: EOF")
299
			if len(writer.buf) > 0 {
300
				writeBuff()
301
			}
302
			err = nil
303
			writer.wg.Wait()
304
			var wErr *wSliceErr
305
			select {
306
			case wErr := <-writer.err:
307
				err = wErr.err
308
			default:
309
			}
310
			if err != nil {
311
				log.LogErrorf("slice write error,ino(%v) fileoffset(%v)  sliceSize(%v) err(%v)", writer.ino, wErr.fileOffset, wErr.size, err)
312
			}
313
			break
314
		}
315
	}
316

317
	log.LogDebugf("WriteFromReader before sort: %v", oeks)
318
	sort.Slice(oeks, func(i, j int) bool {
319
		return oeks[i].FileOffset < oeks[j].FileOffset
320
	})
321
	log.LogDebugf("WriteFromReader after sort: %v", oeks)
322
	if err = writer.mw.AppendObjExtentKeys(writer.ino, oeks); err != nil {
323
		log.LogErrorf("WriteFromReader error,meta append ebsc extent keys fail,ino(%v), err(%v)", writer.ino, err)
324
		return
325
	}
326

327
	size = uint64(writer.fileOffset)
328
	atomic.AddUint64(&writer.fileSize, size)
329
	return
330
}
331

332
func (writer *Writer) doBufferWriteWithoutPool(ctx context.Context, data []byte, offset int) (size int, err error) {
333
	log.LogDebugf("TRACE blobStore doBufferWriteWithoutPool Enter: ino(%v) offset(%v) len(%v)", writer.ino, offset, len(data))
334

335
	writer.fileOffset = offset
336
	dataSize := len(data)
337
	position := 0
338
	log.LogDebugf("TRACE blobStore doBufferWriteWithoutPool: ino(%v) writer.buf.len(%v) writer.blocksize(%v)", writer.ino, len(writer.buf), writer.blockSize)
339
	writer.Lock()
340
	defer writer.Unlock()
341
	for dataSize > 0 {
342
		freeSize := writer.blockSize - len(writer.buf)
343
		if dataSize < freeSize {
344
			freeSize = dataSize
345
		}
346
		log.LogDebugf("TRACE blobStore doBufferWriteWithoutPool: ino(%v) writer.fileSize(%v) writer.fileOffset(%v) position(%v) freeSize(%v)", writer.ino, writer.fileSize, writer.fileOffset, position, freeSize)
347
		writer.buf = append(writer.buf, data[position:position+freeSize]...)
348
		log.LogDebugf("TRACE blobStore doBufferWriteWithoutPool:ino(%v) writer.buf.len(%v)", writer.ino, len(writer.buf))
349
		position += freeSize
350
		dataSize -= freeSize
351
		writer.fileOffset += freeSize
352
		writer.dirty = true
353

354
		if len(writer.buf) == writer.blockSize {
355
			log.LogDebugf("TRACE blobStore doBufferWriteWithoutPool: ino(%v) writer.buf.len(%v) writer.blocksize(%v)", writer.ino, len(writer.buf), writer.blockSize)
356
			writer.Unlock()
357
			err = writer.flushWithoutPool(writer.ino, ctx, false)
358
			writer.Lock()
359
			if err != nil {
360
				writer.buf = writer.buf[:len(writer.buf)-len(data)]
361
				writer.fileOffset -= len(data)
362
				return
363
			}
364

365
		}
366
	}
367

368
	size = len(data)
369
	atomic.AddUint64(&writer.fileSize, uint64(size))
370

371
	log.LogDebugf("TRACE blobStore doBufferWriteWithoutPool Exit: ino(%v) writer.fileSize(%v) writer.fileOffset(%v)", writer.ino, writer.fileSize, writer.fileOffset)
372
	return size, nil
373
}
374

375
func (writer *Writer) doBufferWrite(ctx context.Context, data []byte, offset int) (size int, err error) {
376
	log.LogDebugf("TRACE blobStore doBufferWrite Enter: ino(%v) offset(%v) len(%v)", writer.ino, offset, len(data))
377

378
	writer.fileOffset = offset
379
	dataSize := len(data)
380
	position := 0
381
	log.LogDebugf("TRACE blobStore doBufferWrite: ino(%v) writer.buf.len(%v) writer.blocksize(%v)", writer.ino, len(writer.buf), writer.blockSize)
382
	writer.Lock()
383
	defer writer.Unlock()
384
	for dataSize > 0 {
385
		freeSize := writer.blockSize - writer.blockPosition
386
		if dataSize < freeSize {
387
			freeSize = dataSize
388
		}
389
		log.LogDebugf("TRACE blobStore doBufferWrite: ino(%v) writer.fileSize(%v) writer.fileOffset(%v) writer.blockPosition(%v) position(%v) freeSize(%v)", writer.ino, writer.fileSize, writer.fileOffset, writer.blockPosition, position, freeSize)
390
		copy(writer.buf[writer.blockPosition:], data[position:position+freeSize])
391
		log.LogDebugf("TRACE blobStore doBufferWrite:ino(%v) writer.buf.len(%v)", writer.ino, len(writer.buf))
392
		position += freeSize
393
		writer.blockPosition += freeSize
394
		dataSize -= freeSize
395
		writer.fileOffset += freeSize
396
		writer.dirty = true
397

398
		if writer.blockPosition == writer.blockSize {
399
			log.LogDebugf("TRACE blobStore doBufferWrite: ino(%v) writer.buf.len(%v) writer.blocksize(%v)", writer.ino, len(writer.buf), writer.blockSize)
400
			writer.Unlock()
401
			err = writer.flush(writer.ino, ctx, false)
402
			writer.Lock()
403
			if err != nil {
404
				writer.buf = writer.buf[:writer.blockPosition-freeSize]
405
				writer.fileOffset -= freeSize
406
				writer.blockPosition -= freeSize
407
				return
408
			}
409
		}
410
	}
411

412
	size = len(data)
413
	atomic.AddUint64(&writer.fileSize, uint64(size))
414

415
	log.LogDebugf("TRACE blobStore doBufferWrite Exit: ino(%v) writer.fileSize(%v) writer.fileOffset(%v)", writer.ino, writer.fileSize, writer.fileOffset)
416
	return size, nil
417
}
418

419
func (writer *Writer) FlushWithoutPool(ino uint64, ctx context.Context) (err error) {
420
	if writer == nil {
421
		return
422
	}
423
	return writer.flushWithoutPool(ino, ctx, true)
424
}
425

426
func (writer *Writer) Flush(ino uint64, ctx context.Context) (err error) {
427
	if writer == nil {
428
		return
429
	}
430
	return writer.flush(ino, ctx, true)
431
}
432

433
func (writer *Writer) shouldCacheCfs() bool {
434
	return writer.cacheAction == proto.RWCache
435
}
436

437
func (writer *Writer) prepareWriteSlice(offset int, data []byte) []*rwSlice {
438
	size := len(data)
439
	wSlices := make([]*rwSlice, 0)
440
	wSliceCount := size / writer.blockSize
441
	remainSize := size % writer.blockSize
442
	for index := 0; index < wSliceCount; index++ {
443
		offset := offset + index*writer.blockSize
444
		wSlice := &rwSlice{
445
			index:      index,
446
			fileOffset: uint64(offset),
447
			size:       uint32(writer.blockSize),
448
			Data:       data[index*writer.blockSize : (index+1)*writer.blockSize],
449
		}
450
		wSlices = append(wSlices, wSlice)
451
	}
452
	offset = offset + wSliceCount*writer.blockSize
453
	if remainSize > 0 {
454
		wSlice := &rwSlice{
455
			index:      wSliceCount,
456
			fileOffset: uint64(offset),
457
			size:       uint32(remainSize),
458
			Data:       data[wSliceCount*writer.blockSize:],
459
		}
460
		wSlices = append(wSlices, wSlice)
461
	}
462

463
	return wSlices
464
}
465

466
func (writer *Writer) writeSlice(ctx context.Context, wSlice *rwSlice, wg bool) (err error) {
467
	if wg {
468
		defer writer.wg.Done()
469
	}
470
	writer.limitManager.WriteAlloc(ctx, int(wSlice.size))
471
	log.LogDebugf("TRACE blobStore,writeSlice to ebs. ino(%v) fileOffset(%v) len(%v)", writer.ino, wSlice.fileOffset, wSlice.size)
472
	location, err := writer.ebsc.Write(ctx, writer.volName, wSlice.Data, wSlice.size)
473
	if err != nil {
474
		if wg {
475
			writer.err <- &wSliceErr{err: err, fileOffset: wSlice.fileOffset, size: wSlice.size}
476
		}
477
		return err
478
	}
479
	log.LogDebugf("TRACE blobStore,location(%v)", location)
480
	blobs := make([]proto.Blob, 0)
481
	for _, info := range location.Blobs {
482
		blob := proto.Blob{
483
			MinBid: uint64(info.MinBid),
484
			Count:  uint64(info.Count),
485
			Vid:    uint64(info.Vid),
486
		}
487
		blobs = append(blobs, blob)
488
	}
489
	wSlice.objExtentKey = proto.ObjExtentKey{
490
		Cid:        uint64(location.ClusterID),
491
		CodeMode:   uint8(location.CodeMode),
492
		Size:       location.Size,
493
		BlobSize:   location.BlobSize,
494
		Blobs:      blobs,
495
		BlobsLen:   uint32(len(blobs)),
496
		FileOffset: wSlice.fileOffset,
497
		Crc:        location.Crc,
498
	}
499
	log.LogDebugf("TRACE blobStore,objExtentKey(%v)", wSlice.objExtentKey)
500

501
	if wg {
502
		writer.err <- nil
503
	}
504
	return
505
}
506

507
func (writer *Writer) asyncCache(ino uint64, offset int, data []byte) {
508
	var err error
509
	bgTime := stat.BeginStat()
510
	defer func() {
511
		stat.EndStat("write-async-cache", err, bgTime, 1)
512
	}()
513

514
	log.LogDebugf("TRACE asyncCache Enter,fileOffset(%v) len(%v)", offset, len(data))
515
	write, err := writer.ec.Write(ino, offset, data, proto.FlagsCache, nil)
516
	log.LogDebugf("TRACE asyncCache Exit,write(%v) err(%v)", write, err)
517
}
518

519
func (writer *Writer) resetBufferWithoutPool() {
520
	writer.buf = writer.buf[:0]
521
}
522

523
func (writer *Writer) resetBuffer() {
524
	// writer.buf = writer.buf[:0]
525
	writer.blockPosition = 0
526
}
527

528
func (writer *Writer) flushWithoutPool(inode uint64, ctx context.Context, flushFlag bool) (err error) {
529
	bgTime := stat.BeginStat()
530
	defer func() {
531
		stat.EndStat("blobstore-flush", err, bgTime, 1)
532
	}()
533

534
	log.LogDebugf("TRACE blobStore flushWithoutPool: ino(%v) buf-len(%v) flushFlag(%v)", inode, len(writer.buf), flushFlag)
535
	writer.Lock()
536
	defer func() {
537
		writer.dirty = false
538
		writer.Unlock()
539
	}()
540

541
	if len(writer.buf) == 0 || !writer.dirty {
542
		return
543
	}
544
	bufferSize := len(writer.buf)
545
	wSlice := &rwSlice{
546
		fileOffset: uint64(writer.fileOffset - bufferSize),
547
		size:       uint32(bufferSize),
548
		Data:       writer.buf,
549
	}
550
	err = writer.writeSlice(ctx, wSlice, false)
551
	if err != nil {
552
		if flushFlag {
553
			atomic.AddUint64(&writer.fileSize, -uint64(bufferSize))
554
		}
555
		return
556
	}
557

558
	oeks := make([]proto.ObjExtentKey, 0)
559
	// update meta
560
	oeks = append(oeks, wSlice.objExtentKey)
561
	if err = writer.mw.AppendObjExtentKeys(writer.ino, oeks); err != nil {
562
		log.LogErrorf("slice write error,meta append ebsc extent keys fail,ino(%v) fileOffset(%v) len(%v) err(%v)", inode, wSlice.fileOffset, wSlice.size, err)
563
		return
564
	}
565
	writer.resetBufferWithoutPool()
566

567
	writer.cacheLevel2(wSlice)
568
	return
569
}
570

571
func (writer *Writer) flush(inode uint64, ctx context.Context, flushFlag bool) (err error) {
572
	bgTime := stat.BeginStat()
573
	defer func() {
574
		stat.EndStat("blobstore-flush", err, bgTime, 1)
575
	}()
576

577
	log.LogDebugf("TRACE blobStore flush: ino(%v) buf-len(%v) flushFlag(%v)", inode, len(writer.buf), flushFlag)
578
	writer.Lock()
579
	defer func() {
580
		writer.dirty = false
581
		writer.Unlock()
582
	}()
583

584
	if len(writer.buf) == 0 || !writer.dirty {
585
		return
586
	}
587
	bufferSize := writer.blockPosition
588
	wSlice := &rwSlice{
589
		fileOffset: uint64(writer.fileOffset - bufferSize),
590
		size:       uint32(bufferSize),
591
		Data:       writer.buf,
592
	}
593
	err = writer.writeSlice(ctx, wSlice, false)
594
	if err != nil {
595
		if flushFlag {
596
			atomic.AddUint64(&writer.fileSize, -uint64(bufferSize))
597
		}
598
		return
599
	}
600

601
	oeks := make([]proto.ObjExtentKey, 0)
602
	// update meta
603
	oeks = append(oeks, wSlice.objExtentKey)
604
	if err = writer.mw.AppendObjExtentKeys(writer.ino, oeks); err != nil {
605
		log.LogErrorf("slice write error,meta append ebsc extent keys fail,ino(%v) fileOffset(%v) len(%v) err(%v)", inode, wSlice.fileOffset, wSlice.size, err)
606
		return
607
	}
608
	writer.resetBuffer()
609

610
	writer.cacheLevel2(wSlice)
611
	return
612
}
613

614
func (writer *Writer) CacheFileSize() int {
615
	return int(atomic.LoadUint64(&writer.fileSize))
616
}
617

618
func (writer *Writer) FreeCache() {
619
	if writer == nil {
620
		return
621
	}
622
	if buf.CachePool == nil {
623
		return
624
	}
625
	writer.once.Do(func() {
626
		tmpBuf := writer.buf
627
		writer.buf = nil
628
		if tmpBuf != nil {
629
			buf.CachePool.Put(tmpBuf)
630
		}
631
	})
632
}
633

634
func (writer *Writer) allocateCache() {
635
	if buf.CachePool == nil {
636
		return
637
	}
638
	writer.buf = buf.CachePool.Get()
639
}
640

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.