1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
30
"github.com/cubefs/cubefs/proto"
31
"github.com/cubefs/cubefs/util"
32
"github.com/cubefs/cubefs/util/log"
36
ExtentOpenOpt = os.O_CREATE | os.O_RDWR | os.O_EXCL
43
ExtentMaxSize = 1024 * 1024 * 1024 * 1024 * 4 // 4TB
46
type ExtentInfo struct {
47
FileID uint64 `json:"fileId"`
48
Size uint64 `json:"size"`
49
Crc uint32 `json:"Crc"`
50
IsDeleted bool `json:"deleted"`
51
ModifyTime int64 `json:"modTime"` // random write not update modify time
52
AccessTime int64 `json:"accessTime"`
53
Source string `json:"src"`
54
SnapshotDataOff uint64 `json:"snapSize"`
55
SnapPreAllocDataOff uint64 `json:"snapPreAllocSize"`
56
ApplyID uint64 `json:"applyID"`
59
func (ei *ExtentInfo) TotalSize() uint64 {
60
if ei.SnapshotDataOff > util.ExtentSize {
61
return ei.Size + (ei.SnapshotDataOff - util.ExtentSize)
66
func (ei *ExtentInfo) String() (m string) {
71
return fmt.Sprintf("%v_%v_%v_%v_%v_%d_%d_%d", ei.FileID, ei.Size, ei.SnapshotDataOff, ei.IsDeleted, source, ei.ModifyTime, ei.AccessTime, ei.Crc)
74
// SortedExtentInfos defines an array sorted by AccessTime
75
type SortedExtentInfos []*ExtentInfo
77
func (extInfos SortedExtentInfos) Len() int {
81
func (extInfos SortedExtentInfos) Less(i, j int) bool {
82
return extInfos[i].AccessTime < extInfos[j].AccessTime
85
func (extInfos SortedExtentInfos) Swap(i, j int) {
86
extInfos[i], extInfos[j] = extInfos[j], extInfos[i]
89
// Extent is an implementation of Extent for local regular extent file data management.
90
// This extent implementation manages all header info and data body in one single entry file.
91
// Header of extent include inode value of this extent block and Crc blocks of data blocks.
101
snapshotDataOff uint64
105
// NewExtentInCore create and returns a new extent instance.
106
func NewExtentInCore(name string, extentID uint64) *Extent {
108
e.extentID = extentID
110
e.snapshotDataOff = util.ExtentSize
114
func (e *Extent) String() string {
115
return fmt.Sprintf("%v_%v_%v", e.filePath, e.dataSize, e.snapshotDataOff)
118
func (e *Extent) GetSize() (int64, uint64) {
119
return e.dataSize, e.snapshotDataOff
122
func (e *Extent) HasClosed() bool {
123
return atomic.LoadInt32(&e.hasClose) == ExtentHasClose
126
// Close this extent and release FD.
127
func (e *Extent) Close() (err error) {
131
if err = e.file.Close(); err != nil {
137
func (e *Extent) Exist() (exsit bool) {
138
_, err := os.Stat(e.filePath)
140
return os.IsExist(err)
145
func (e *Extent) GetFile() *os.File {
149
// InitToFS init extent data info filesystem. If entry file exist and overwrite is true,
150
// this operation will clear all data of exist entry file and initialize extent header data.
151
func (e *Extent) InitToFS() (err error) {
152
if e.file, err = os.OpenFile(e.filePath, ExtentOpenOpt, 0o666); err != nil {
156
if IsTinyExtent(e.extentID) {
160
atomic.StoreInt64(&e.modifyTime, time.Now().Unix())
161
atomic.StoreInt64(&e.accessTime, time.Now().Unix())
166
func (e *Extent) GetDataSize(statSize int64) (dataSize int64) {
175
// curOff if the hold start and the data end
176
curOff, err = e.file.Seek(holStart, SEEK_DATA)
177
if err != nil || curOff >= util.ExtentSize || (holStart > 0 && holStart == curOff) {
178
log.LogDebugf("GetDataSize statSize %v curOff %v dataStart %v holStart %v, err %v,path %v", statSize, curOff, dataStart, holStart, err, e.filePath)
181
log.LogDebugf("GetDataSize statSize %v curOff %v dataStart %v holStart %v, err %v,path %v", statSize, curOff, dataStart, holStart, err, e.filePath)
184
curOff, err = e.file.Seek(dataStart, SEEK_HOLE)
185
if err != nil || curOff >= util.ExtentSize || dataStart == curOff {
186
log.LogDebugf("GetDataSize statSize %v curOff %v dataStart %v holStart %v, err %v,path %v", statSize, curOff, dataStart, holStart, err, e.filePath)
189
log.LogDebugf("GetDataSize statSize %v curOff %v dataStart %v holStart %v, err %v,path %v", statSize, curOff, dataStart, holStart, err, e.filePath)
192
log.LogDebugf("GetDataSize statSize %v curOff %v dataStart %v holStart %v, err %v,path %v", statSize, curOff, dataStart, holStart, err, e.filePath)
194
if statSize > util.ExtentSize {
195
return util.ExtentSize
202
// RestoreFromFS restores the entity data and status from the file stored on the filesystem.
203
func (e *Extent) RestoreFromFS() (err error) {
204
if e.file, err = os.OpenFile(e.filePath, os.O_RDWR, 0o666); err != nil {
205
if strings.Contains(err.Error(), syscall.ENOENT.Error()) {
206
err = ExtentNotFoundError
211
if info, err = e.file.Stat(); err != nil {
212
err = fmt.Errorf("stat file %v: %v", e.file.Name(), err)
216
if IsTinyExtent(e.extentID) {
217
watermark := info.Size()
218
if watermark%util.PageSize != 0 {
219
watermark = watermark + (util.PageSize - watermark%util.PageSize)
221
e.dataSize = watermark
225
e.dataSize = e.GetDataSize(info.Size())
226
e.snapshotDataOff = util.ExtentSize
227
if info.Size() > util.ExtentSize {
228
e.snapshotDataOff = uint64(info.Size())
231
atomic.StoreInt64(&e.modifyTime, info.ModTime().Unix())
233
ts := info.Sys().(*syscall.Stat_t)
234
atomic.StoreInt64(&e.accessTime, time.Unix(int64(ts.Atim.Sec), int64(ts.Atim.Nsec)).Unix())
238
// Size returns length of the extent (not including the header).
239
func (e *Extent) Size() (size int64) {
243
// ModifyTime returns the time when this extent was modified recently.
244
func (e *Extent) ModifyTime() int64 {
245
return atomic.LoadInt64(&e.modifyTime)
248
func IsRandomWrite(writeType int) bool {
249
return writeType == RandomWriteType
252
func IsAppendWrite(writeType int) bool {
253
return writeType == AppendWriteType
256
func IsAppendRandomWrite(writeType int) bool {
257
return writeType == AppendRandomWriteType
260
// WriteTiny performs write on a tiny extent.
261
func (e *Extent) WriteTiny(data []byte, offset, size int64, crc uint32, writeType int, isSync bool) (err error) {
264
index := offset + size
265
if index >= ExtentMaxSize {
266
return ExtentIsFullError
269
if IsAppendWrite(writeType) && offset != e.dataSize {
270
return ParameterMismatchError
273
if _, err = e.file.WriteAt(data[:size], int64(offset)); err != nil {
277
if err = e.file.Sync(); err != nil {
282
if !IsAppendWrite(writeType) {
285
if index%util.PageSize != 0 {
286
index = index + (util.PageSize - index%util.PageSize)
293
// Write writes data to an extent.
294
func (e *Extent) Write(data []byte, offset, size int64, crc uint32, writeType int, isSync bool, crcFunc UpdateCrcFunc, ei *ExtentInfo) (status uint8, err error) {
295
log.LogDebugf("action[Extent.Write] path %v offset %v size %v writeType %v", e.filePath, offset, size, writeType)
297
if IsTinyExtent(e.extentID) {
298
err = e.WriteTiny(data, offset, size, crc, writeType, isSync)
302
if err = e.checkWriteOffsetAndSize(writeType, offset, size); err != nil {
303
log.LogErrorf("action[Extent.Write] checkWriteOffsetAndSize offset %v size %v writeType %v err %v",
304
offset, size, writeType, err)
305
err = newParameterError("extent current size=%d write offset=%d write size=%d", e.dataSize, offset, size)
306
log.LogInfof("action[Extent.Write] newParameterError path %v offset %v size %v writeType %v err %v", e.filePath,
307
offset, size, writeType, err)
308
status = proto.OpTryOtherExtent
312
log.LogDebugf("action[Extent.Write] path %v offset %v size %v writeType %v", e.filePath, offset, size, writeType)
313
// Check if extent file size matches the write offset just in case
314
// multiple clients are writing concurrently.
317
log.LogDebugf("action[Extent.Write] offset %v size %v writeType %v path %v", offset, size, writeType, e.filePath)
318
if IsAppendWrite(writeType) && e.dataSize != offset {
319
err = newParameterError("extent current size=%d write offset=%d write size=%d", e.dataSize, offset, size)
320
log.LogInfof("action[Extent.Write] newParameterError path %v offset %v size %v writeType %v err %v", e.filePath,
321
offset, size, writeType, err)
322
status = proto.OpTryOtherExtent
325
if IsAppendRandomWrite(writeType) {
326
if e.snapshotDataOff <= util.ExtentSize {
327
log.LogInfof("action[Extent.Write] truncate extent %v offset %v size %v writeType %v truncate err %v", e, offset, size, writeType, err)
328
if err = e.file.Truncate(util.ExtentSize); err != nil {
329
log.LogErrorf("action[Extent.Write] offset %v size %v writeType %v truncate err %v", offset, size, writeType, err)
334
if _, err = e.file.WriteAt(data[:size], int64(offset)); err != nil {
335
log.LogErrorf("action[Extent.Write] offset %v size %v writeType %v err %v", offset, size, writeType, err)
339
blockNo := offset / util.BlockSize
340
offsetInBlock := offset % util.BlockSize
342
log.LogDebugf("action[Extent.Write] offset %v size %v writeType %v path %v", offset, size, writeType, e.filePath)
343
if IsAppendWrite(writeType) {
344
atomic.StoreInt64(&e.modifyTime, time.Now().Unix())
345
e.dataSize = int64(math.Max(float64(e.dataSize), float64(offset+size)))
346
log.LogDebugf("action[Extent.Write] e %v offset %v size %v writeType %v", e, offset, size, writeType)
347
} else if IsAppendRandomWrite(writeType) {
348
atomic.StoreInt64(&e.modifyTime, time.Now().Unix())
349
e.snapshotDataOff = uint64(math.Max(float64(e.snapshotDataOff), float64(offset+size)))
351
log.LogDebugf("action[Extent.Write] offset %v size %v writeType %v dataSize %v snapshotDataOff %v",
352
offset, size, writeType, e.dataSize, e.snapshotDataOff)
356
if err = e.file.Sync(); err != nil {
357
log.LogDebugf("action[Extent.Write] offset %v size %v writeType %v err %v",
358
offset, size, writeType, err)
362
if offsetInBlock == 0 && size == util.BlockSize {
363
err = crcFunc(e, int(blockNo), crc)
364
log.LogDebugf("action[Extent.Write] offset %v size %v writeType %v err %v", offset, size, writeType, err)
368
if offsetInBlock+size <= util.BlockSize {
369
err = crcFunc(e, int(blockNo), 0)
370
log.LogDebugf("action[Extent.Write] offset %v size %v writeType %v err %v", offset, size, writeType, err)
373
log.LogDebugf("action[Extent.Write] offset %v size %v writeType %v", offset, size, writeType)
374
if err = crcFunc(e, int(blockNo), 0); err == nil {
375
err = crcFunc(e, int(blockNo+1), 0)
380
// Read reads data from an extent.
381
func (e *Extent) Read(data []byte, offset, size int64, isRepairRead bool) (crc uint32, err error) {
382
log.LogDebugf("action[Extent.read] offset %v size %v extent %v", offset, size, e)
383
if IsTinyExtent(e.extentID) {
384
return e.ReadTiny(data, offset, size, isRepairRead)
387
if err = e.checkReadOffsetAndSize(offset, size); err != nil {
388
log.LogErrorf("action[Extent.Read] offset %d size %d err %v", offset, size, err)
393
if rSize, err = e.file.ReadAt(data[:size], offset); err != nil {
394
log.LogErrorf("action[Extent.Read] offset %v size %v err %v realsize %v", offset, size, err, rSize)
397
crc = crc32.ChecksumIEEE(data)
401
// ReadTiny read data from a tiny extent.
402
func (e *Extent) ReadTiny(data []byte, offset, size int64, isRepairRead bool) (crc uint32, err error) {
403
_, err = e.file.ReadAt(data[:size], offset)
404
if isRepairRead && err == io.EOF {
407
crc = crc32.ChecksumIEEE(data[:size])
411
func (e *Extent) checkReadOffsetAndSize(offset, size int64) error {
412
if (e.snapshotDataOff == util.ExtentSize && offset > e.Size()) ||
413
(e.snapshotDataOff > util.ExtentSize && uint64(offset) > e.snapshotDataOff) {
414
return newParameterError("offset=%d size=%d snapshotDataOff=%d", offset, size, e.snapshotDataOff)
419
func (e *Extent) checkWriteOffsetAndSize(writeType int, offset, size int64) error {
420
err := newParameterError("writeType=%d offset=%d size=%d", writeType, offset, size)
421
if IsAppendWrite(writeType) {
422
if size == 0 || size > util.BlockSize ||
423
offset+size > util.ExtentSize || offset >= util.ExtentSize {
426
} else if IsAppendRandomWrite(writeType) {
427
log.LogDebugf("action[checkOffsetAndSize] offset %v size %v", offset, size)
428
if offset < util.ExtentSize || size == 0 {
435
// Flush synchronizes data to the disk.
436
func (e *Extent) Flush() (err error) {
441
func (e *Extent) GetCrc(blockNo int64) uint32 {
442
if int64(len(e.header)) < (blockNo+1)*util.PerBlockCrcSize {
445
return binary.BigEndian.Uint32(e.header[blockNo*util.PerBlockCrcSize : (blockNo+1)*util.PerBlockCrcSize])
448
func (e *Extent) autoComputeExtentCrc(crcFunc UpdateCrcFunc) (crc uint32, err error) {
451
if e.snapshotDataOff > util.ExtentSize {
452
extSize = int64(e.snapshotDataOff)
454
blockCnt = int(extSize / util.BlockSize)
455
if extSize%util.BlockSize != 0 {
458
log.LogDebugf("autoComputeExtentCrc. path %v extent %v extent size %v,blockCnt %v", e.filePath, e.extentID, extSize, blockCnt)
459
crcData := make([]byte, blockCnt*util.PerBlockCrcSize)
460
for blockNo := 0; blockNo < blockCnt; blockNo++ {
461
blockCrc := binary.BigEndian.Uint32(e.header[blockNo*util.PerBlockCrcSize : (blockNo+1)*util.PerBlockCrcSize])
463
binary.BigEndian.PutUint32(crcData[blockNo*util.PerBlockCrcSize:(blockNo+1)*util.PerBlockCrcSize], blockCrc)
466
bdata := make([]byte, util.BlockSize)
467
offset := int64(blockNo * util.BlockSize)
468
readN, err := e.file.ReadAt(bdata[:util.BlockSize], offset)
469
if readN == 0 && err != nil {
470
log.LogErrorf("autoComputeExtentCrc. path %v extent %v blockNo %v, readN %v err %v", e.filePath, e.extentID, blockNo, readN, err)
473
blockCrc = crc32.ChecksumIEEE(bdata[:readN])
474
err = crcFunc(e, blockNo, blockCrc)
476
log.LogErrorf("autoComputeExtentCrc. path %v extent %v blockNo %v, err %v", e.filePath, e.extentID, blockNo, err)
479
log.LogDebugf("autoComputeExtentCrc. path %v extent %v blockCrc %v,blockNo %v", e.filePath, e.extentID, blockCrc, blockNo)
480
binary.BigEndian.PutUint32(crcData[blockNo*util.PerBlockCrcSize:(blockNo+1)*util.PerBlockCrcSize], blockCrc)
482
crc = crc32.ChecksumIEEE(crcData)
483
log.LogDebugf("autoComputeExtentCrc. path %v extent %v crc %v", e.filePath, e.extentID, crc)
487
// DeleteTiny deletes a tiny extent.
488
func (e *Extent) punchDelete(offset, size int64) (hasDelete bool, err error) {
489
log.LogDebugf("punchDelete extent %v offset %v, size %v", e, offset, size)
490
if int(offset)%util.PageSize != 0 {
491
return false, ParameterMismatchError
493
if int(size)%util.PageSize != 0 {
494
size += int64(util.PageSize - int(size)%util.PageSize)
497
newOffset, err := e.file.Seek(offset, SEEK_DATA)
499
if strings.Contains(err.Error(), syscall.ENXIO.Error()) {
504
if newOffset-offset >= size {
507
log.LogDebugf("punchDelete offset %v size %v", offset, size)
508
err = fallocate(int(e.file.Fd()), util.FallocFLPunchHole|util.FallocFLKeepSize, offset, size)
512
func (e *Extent) getRealBlockCnt() (blockNum int64) {
513
stat := new(syscall.Stat_t)
514
syscall.Stat(e.filePath, stat)
518
func (e *Extent) TinyExtentRecover(data []byte, offset, size int64, crc uint32, isEmptyPacket bool) (err error) {
521
if !IsTinyExtent(e.extentID) {
522
return ParameterMismatchError
524
if offset%util.PageSize != 0 || offset != e.dataSize {
525
return fmt.Errorf("error empty packet on (%v) offset(%v) size(%v)"+
526
" isEmptyPacket(%v) e.dataSize(%v)", e.file.Name(), offset, size, isEmptyPacket, e.dataSize)
528
log.LogDebugf("before file (%v) getRealBlockNo (%v) isEmptyPacket(%v)"+
529
"offset(%v) size(%v) e.datasize(%v)", e.filePath, e.getRealBlockCnt(), isEmptyPacket, offset, size, e.dataSize)
531
var finfo os.FileInfo
532
finfo, err = e.file.Stat()
536
if offset < finfo.Size() {
537
return fmt.Errorf("error empty packet on (%v) offset(%v) size(%v)"+
538
" isEmptyPacket(%v) filesize(%v) e.dataSize(%v)", e.file.Name(), offset, size, isEmptyPacket, finfo.Size(), e.dataSize)
540
if err = syscall.Ftruncate(int(e.file.Fd()), offset+size); err != nil {
543
err = fallocate(int(e.file.Fd()), util.FallocFLPunchHole|util.FallocFLKeepSize, offset, size)
545
_, err = e.file.WriteAt(data[:size], int64(offset))
550
watermark := offset + size
551
if watermark%util.PageSize != 0 {
552
watermark = watermark + (util.PageSize - watermark%util.PageSize)
554
e.dataSize = watermark
555
log.LogDebugf("after file (%v) getRealBlockNo (%v) isEmptyPacket(%v)"+
556
"offset(%v) size(%v) e.datasize(%v)", e.filePath, e.getRealBlockCnt(), isEmptyPacket, offset, size, e.dataSize)
561
func (e *Extent) tinyExtentAvaliOffset(offset int64) (newOffset, newEnd int64, err error) {
564
newOffset, err = e.file.Seek(int64(offset), SEEK_DATA)
568
newEnd, err = e.file.Seek(int64(newOffset), SEEK_HOLE)
572
if newOffset-offset > util.BlockSize {
573
newOffset = offset + util.BlockSize
575
if newEnd-newOffset > util.BlockSize {
576
newEnd = newOffset + util.BlockSize
578
if newEnd < newOffset {
579
err = fmt.Errorf("unavali TinyExtentAvaliOffset on SEEK_DATA or SEEK_HOLE (%v) offset(%v) "+
580
"newEnd(%v) newOffset(%v)", e.extentID, offset, newEnd, newOffset)