1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
35
"github.com/cubefs/cubefs/proto"
36
"github.com/cubefs/cubefs/util"
37
"github.com/cubefs/cubefs/util/errors"
38
"github.com/cubefs/cubefs/util/log"
41
//TODO: remove this later.
42
//go:generate golangci-lint run --issues-exit-code=1 -D errcheck -E bodyclose ./...
45
ExtCrcHeaderFileName = "EXTENT_CRC"
46
ExtBaseExtentIDFileName = "EXTENT_META"
47
TinyDeleteFileOpt = os.O_CREATE | os.O_RDWR | os.O_APPEND
48
TinyExtDeletedFileName = "TINYEXTENT_DELETE"
49
NormalExtDeletedFileName = "NORMALEXTENT_DELETE"
50
MaxExtentCount = 20000
54
DeleteTinyRecordSize = 24
55
UpdateCrcInterval = 600
59
AppendRandomWriteType = 4
61
NormalExtentDeleteRetainTime = 3600 * 4
63
StaleExtStoreBackupSuffix = ".old"
64
StaleExtStoreTimeFormat = "20060102150405.000000000"
68
RegexpExtentFile, _ = regexp.Compile(`^(\d)+$`)
69
SnapShotFilePool = &sync.Pool{New: func() interface{} {
70
return new(proto.File)
74
func GetSnapShotFileFromPool() (f *proto.File) {
75
f = SnapShotFilePool.Get().(*proto.File)
79
func PutSnapShotFileToPool(f *proto.File) {
80
SnapShotFilePool.Put(f)
83
type ExtentFilter func(info *ExtentInfo) bool
87
NormalExtentFilter = func() ExtentFilter {
89
return func(ei *ExtentInfo) bool {
90
return !IsTinyExtent(ei.FileID) && now.Unix()-ei.ModifyTime > RepairInterval && !ei.IsDeleted
94
TinyExtentFilter = func(filters []uint64) ExtentFilter {
95
return func(ei *ExtentInfo) bool {
96
if !IsTinyExtent(ei.FileID) {
99
for _, filterID := range filters {
100
if filterID == ei.FileID {
109
// ExtentStore defines fields used in the storage engine.
110
// Packets smaller than 128K are stored in the "tinyExtent", a place to persist the small files.
111
// packets larger than or equal to 128K are stored in the normal "extent", a place to persist large files.
112
// The difference between them is that the extentID of a tinyExtent starts at 5000000 and ends at 5000128.
113
// Multiple small files can be appended to the same tinyExtent.
114
// In addition, the deletion of small files is implemented by the punch hole from the underlying file system.
115
type ExtentStore struct {
117
baseExtentID uint64 // TODO what is baseExtentID
118
extentInfoMap map[uint64]*ExtentInfo // map that stores all the extent information
119
eiMutex sync.RWMutex // mutex for extent info
120
cache *ExtentCache // extent cache
122
storeSize int // size of the extent store
123
metadataFp *os.File // metadata file pointer?
124
tinyExtentDeleteFp *os.File
125
normalExtentDeleteFp *os.File
128
availableTinyExtentC chan uint64 // available tinyExtent channel
129
availableTinyExtentMap sync.Map
130
brokenTinyExtentC chan uint64 // broken tinyExtent channel
131
brokenTinyExtentMap sync.Map
134
verifyExtentFp *os.File
136
verifyExtentFpAppend []*os.File
137
hasAllocSpaceExtentIDOnVerfiyFile uint64
138
hasDeleteNormalExtentsCache sync.Map
141
ApplyIdMutex sync.RWMutex
144
func MkdirAll(name string) (err error) {
145
return os.MkdirAll(name, 0o755)
148
func NewExtentStore(dataDir string, partitionID uint64, storeSize, dpType int, isCreate bool) (s *ExtentStore, err error) {
151
s.partitionType = dpType
152
s.partitionID = partitionID
155
if err = s.renameStaleExtentStore(); err != nil {
158
if err = MkdirAll(dataDir); err != nil {
159
return nil, fmt.Errorf("NewExtentStore [%v] err[%v]", dataDir, err)
162
if s.tinyExtentDeleteFp, err = os.OpenFile(path.Join(s.dataPath, TinyExtDeletedFileName), TinyDeleteFileOpt, 0o666); err != nil {
165
if s.verifyExtentFp, err = os.OpenFile(path.Join(s.dataPath, ExtCrcHeaderFileName), os.O_CREATE|os.O_RDWR, 0o666); err != nil {
168
if s.metadataFp, err = os.OpenFile(path.Join(s.dataPath, ExtBaseExtentIDFileName), os.O_CREATE|os.O_RDWR, 0o666); err != nil {
171
if s.normalExtentDeleteFp, err = os.OpenFile(path.Join(s.dataPath, NormalExtDeletedFileName), os.O_CREATE|os.O_RDWR|os.O_APPEND, 0o666); err != nil {
175
if err = MkdirAll(dataDir); err != nil {
176
return nil, fmt.Errorf("NewExtentStore [%v] err[%v]", dataDir, err)
178
if s.tinyExtentDeleteFp, err = os.OpenFile(path.Join(s.dataPath, TinyExtDeletedFileName), os.O_RDWR|os.O_APPEND, 0o666); err != nil {
181
if s.verifyExtentFp, err = os.OpenFile(path.Join(s.dataPath, ExtCrcHeaderFileName), os.O_RDWR, 0o666); err != nil {
184
if s.metadataFp, err = os.OpenFile(path.Join(s.dataPath, ExtBaseExtentIDFileName), os.O_RDWR, 0o666); err != nil {
187
if s.normalExtentDeleteFp, err = os.OpenFile(path.Join(s.dataPath, NormalExtDeletedFileName), os.O_RDWR|os.O_APPEND, 0o666); err != nil {
192
stat, err := s.tinyExtentDeleteFp.Stat()
196
if stat.Size()%DeleteTinyRecordSize != 0 {
197
needWriteEmpty := DeleteTinyRecordSize - (stat.Size() % DeleteTinyRecordSize)
198
data := make([]byte, needWriteEmpty)
199
s.tinyExtentDeleteFp.Write(data)
202
log.LogDebugf("NewExtentStore.partitionID [%v] dataPath %v verifyExtentFp init", partitionID, s.dataPath)
203
if s.verifyExtentFp, err = os.OpenFile(path.Join(s.dataPath, ExtCrcHeaderFileName), os.O_CREATE|os.O_RDWR, 0o666); err != nil {
210
dataPath := path.Join(s.dataPath, ExtCrcHeaderFileName+"_"+strconv.Itoa(aId))
211
if _, err = os.Stat(dataPath); err != nil {
212
log.LogDebugf("NewExtentStore. partitionID [%v] dataPath not exist err %v. verifyExtentFpAppend init return", partitionID, err)
215
if vFp, err = os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR, 0o666); err != nil {
216
log.LogErrorf("NewExtentStore. partitionID [%v] dataPath exist but open err %v. verifyExtentFpAppend init return", partitionID, err)
219
log.LogDebugf("NewExtentStore. partitionID [%v] dataPath exist and opened id %v", partitionID, aId)
220
s.verifyExtentFpAppend = append(s.verifyExtentFpAppend, vFp)
223
if s.metadataFp, err = os.OpenFile(path.Join(s.dataPath, ExtBaseExtentIDFileName), os.O_CREATE|os.O_RDWR, 0o666); err != nil {
226
if s.normalExtentDeleteFp, err = os.OpenFile(path.Join(s.dataPath, NormalExtDeletedFileName), os.O_CREATE|os.O_RDWR|os.O_APPEND, 0o666); err != nil {
230
s.extentInfoMap = make(map[uint64]*ExtentInfo)
231
s.cache = NewExtentCache(100)
232
if err = s.initBaseFileID(); err != nil {
233
err = fmt.Errorf("init base field ID: %v", err)
236
s.hasAllocSpaceExtentIDOnVerfiyFile = s.GetPreAllocSpaceExtentIDOnVerifyFile()
237
s.storeSize = storeSize
238
s.closeC = make(chan bool, 1)
240
err = s.initTinyExtent()
247
func (ei *ExtentInfo) UpdateExtentInfo(extent *Extent, crc uint32) {
249
defer extent.Unlock()
251
if time.Now().Unix()-extent.ModifyTime() <= UpdateCrcInterval {
255
ei.Size = uint64(extent.dataSize)
256
ei.SnapshotDataOff = extent.snapshotDataOff
258
log.LogInfof("action[ExtentInfo.UpdateExtentInfo] ei info [%v]", ei.String())
260
if !IsTinyExtent(ei.FileID) {
261
atomic.StoreUint32(&ei.Crc, crc)
262
ei.ModifyTime = extent.ModifyTime()
266
// SnapShot returns the information of all the extents on the current data partition.
267
// When the master sends the loadDataPartition request, the snapshot is used to compare the replicas.
268
func (s *ExtentStore) SnapShot() (files []*proto.File, err error) {
269
var normalExtentSnapshot, tinyExtentSnapshot []*ExtentInfo
271
// compute crc again to guarantee crc and applyID is the newest
272
s.autoComputeExtentCrc()
274
if normalExtentSnapshot, _, err = s.GetAllWatermarks(NormalExtentFilter()); err != nil {
275
log.LogErrorf("SnapShot GetAllWatermarks err %v", err)
279
files = make([]*proto.File, 0, len(normalExtentSnapshot))
280
for _, ei := range normalExtentSnapshot {
281
file := GetSnapShotFileFromPool()
282
file.Name = strconv.FormatUint(ei.FileID, 10)
283
file.Size = uint32(ei.Size)
284
file.Modified = ei.ModifyTime
285
file.Crc = atomic.LoadUint32(&ei.Crc)
286
file.ApplyID = ei.ApplyID
287
log.LogDebugf("partitionID %v ExtentStore set applyid %v partition %v", s.partitionID, s.ApplyId, s.partitionID)
288
files = append(files, file)
290
tinyExtentSnapshot = s.getTinyExtentInfo()
291
for _, ei := range tinyExtentSnapshot {
292
file := GetSnapShotFileFromPool()
293
file.Name = strconv.FormatUint(ei.FileID, 10)
294
file.Size = uint32(ei.Size)
295
file.Modified = ei.ModifyTime
297
files = append(files, file)
303
// Create creates an extent.
304
func (s *ExtentStore) Create(extentID uint64) (err error) {
306
name := path.Join(s.dataPath, strconv.Itoa(int(extentID)))
307
if s.HasExtent(extentID) {
308
err = ExtentExistsError
312
e = NewExtentInCore(name, extentID)
313
e.header = make([]byte, util.BlockHeaderSize)
320
extInfo := &ExtentInfo{FileID: extentID}
321
extInfo.UpdateExtentInfo(e, 0)
323
atomic.StoreInt64(&extInfo.AccessTime, e.accessTime)
325
s.extentInfoMap[extentID] = extInfo
328
s.UpdateBaseExtentID(extentID)
332
func (s *ExtentStore) initBaseFileID() error {
333
var baseFileID uint64
334
baseFileID, _ = s.GetPersistenceBaseExtentID()
335
files, err := os.ReadDir(s.dataPath)
347
for _, f := range files {
348
if extentID, isExtent = s.ExtentID(f.Name()); !isExtent {
352
if e, loadErr = s.extent(extentID); loadErr != nil {
353
log.LogError("[initBaseFileID] load extent error", loadErr)
357
ei = &ExtentInfo{FileID: extentID}
358
ei.UpdateExtentInfo(e, 0)
359
atomic.StoreInt64(&ei.AccessTime, e.accessTime)
362
s.extentInfoMap[extentID] = ei
366
if !IsTinyExtent(extentID) && extentID > baseFileID {
367
baseFileID = extentID
370
if baseFileID < MinExtentID {
371
baseFileID = MinExtentID
373
atomic.StoreUint64(&s.baseExtentID, baseFileID)
374
log.LogInfof("datadir(%v) maxBaseId(%v)", s.dataPath, baseFileID)
379
// Write writes the given extent to the disk.
380
func (s *ExtentStore) Write(extentID uint64, offset, size int64, data []byte, crc uint32, writeType int, isSync bool) (status uint8, err error) {
387
ei = s.extentInfoMap[extentID]
388
e, err = s.extentWithHeader(ei)
393
// update access time
394
atomic.StoreInt64(&ei.AccessTime, time.Now().Unix())
395
log.LogDebugf("action[Write] dp %v extentID %v offset %v size %v writeTYPE %v", s.partitionID, extentID, offset, size, writeType)
396
if err = s.checkOffsetAndSize(extentID, offset, size, writeType); err != nil {
397
log.LogInfof("action[Write] path %v err %v", e.filePath, err)
401
status, err = e.Write(data, offset, size, crc, writeType, isSync, s.PersistenceBlockCrc, ei)
403
log.LogInfof("action[Write] path %v err %v", e.filePath, err)
407
ei.UpdateExtentInfo(e, 0)
411
func (s *ExtentStore) checkOffsetAndSize(extentID uint64, offset, size int64, writeType int) error {
412
if IsTinyExtent(extentID) {
415
// random write pos can happen on modAppend partition of extent
416
if writeType == RandomWriteType {
419
if writeType == AppendRandomWriteType {
420
if offset < util.ExtentSize {
421
return newParameterError("writeType=%d offset=%d size=%d", writeType, offset, size)
425
if size == 0 || size > util.BlockSize ||
426
offset >= util.BlockCount*util.BlockSize ||
427
offset+size > util.BlockCount*util.BlockSize {
428
return newParameterError("offset=%d size=%d", offset, size)
433
// IsTinyExtent checks if the given extent is tiny extent.
434
func IsTinyExtent(extentID uint64) bool {
435
return extentID >= TinyExtentStartID && extentID < TinyExtentStartID+TinyExtentCount
438
// Read reads the extent based on the given id.
439
func (s *ExtentStore) Read(extentID uint64, offset, size int64, nbuf []byte, isRepairRead bool) (crc uint32, err error) {
442
ei := s.extentInfoMap[extentID]
446
return 0, errors.Trace(ExtentHasBeenDeletedError, "[Read] extent[%d] is already been deleted", extentID)
449
// update extent access time
450
atomic.StoreInt64(&ei.AccessTime, time.Now().Unix())
452
if e, err = s.extentWithHeader(ei); err != nil {
456
//if err = s.checkOffsetAndSize(extentID, offset, size); err != nil {
459
crc, err = e.Read(nbuf, offset, size, isRepairRead)
464
func (s *ExtentStore) DumpExtents() (extInfos SortedExtentInfos) {
466
for _, v := range s.extentInfoMap {
467
extInfos = append(extInfos, v)
473
func (s *ExtentStore) punchDelete(extentID uint64, offset, size int64) (err error) {
474
e, err := s.extentWithHeaderByExtentID(extentID)
478
if offset+size > e.dataSize {
482
if hasDelete, err = e.punchDelete(offset, size); err != nil {
488
if err = s.RecordTinyDelete(e.extentID, offset, size); err != nil {
494
// MarkDelete marks the given extent as deleted.
495
func (s *ExtentStore) MarkDelete(extentID uint64, offset, size int64) (err error) {
498
ei = s.extentInfoMap[extentID]
500
if ei == nil || ei.IsDeleted {
503
log.LogDebugf("action[MarkDelete] extentID %v offset %v size %v ei(size %v snapshotSize %v)",
504
extentID, offset, size, ei.Size, ei.SnapshotDataOff)
506
funcNeedPunchDel := func() bool {
507
return offset != 0 || (size != 0 && ((ei.Size != uint64(size) && ei.SnapshotDataOff == util.ExtentSize) ||
508
(ei.SnapshotDataOff != uint64(size) && ei.SnapshotDataOff > util.ExtentSize)))
511
if IsTinyExtent(extentID) || funcNeedPunchDel() {
512
log.LogDebugf("action[MarkDelete] extentID %v offset %v size %v ei(size %v snapshotSize %v)",
513
extentID, offset, size, ei.Size, ei.SnapshotDataOff)
514
return s.punchDelete(extentID, offset, size)
517
extentFilePath := path.Join(s.dataPath, strconv.FormatUint(extentID, 10))
518
log.LogDebugf("action[MarkDelete] extentID %v offset %v size %v ei(size %v extentFilePath %v)",
519
extentID, offset, size, ei.Size, extentFilePath)
520
if err = os.Remove(extentFilePath); err != nil && !os.IsNotExist(err) {
521
// NOTE: if remove failed
522
// we meet a disk error
523
err = BrokenDiskError
526
if err = s.PersistenceHasDeleteExtent(extentID); err != nil {
527
err = BrokenDiskError
531
ei.ModifyTime = time.Now().Unix()
532
s.cache.Del(extentID)
533
if err = s.DeleteBlockCrc(extentID); err != nil {
534
err = BrokenDiskError
537
s.PutNormalExtentToDeleteCache(extentID)
540
delete(s.extentInfoMap, extentID)
546
func (s *ExtentStore) PutNormalExtentToDeleteCache(extentID uint64) {
547
s.hasDeleteNormalExtentsCache.Store(extentID, time.Now().Unix())
550
func (s *ExtentStore) IsDeletedNormalExtent(extentID uint64) (ok bool) {
551
_, ok = s.hasDeleteNormalExtentsCache.Load(extentID)
555
// Close closes the extent store.
556
func (s *ExtentStore) Close() {
558
defer s.mutex.Unlock()
566
s.tinyExtentDeleteFp.Sync()
567
s.tinyExtentDeleteFp.Close()
568
s.normalExtentDeleteFp.Sync()
569
s.normalExtentDeleteFp.Close()
570
s.verifyExtentFp.Sync()
571
s.verifyExtentFp.Close()
572
for _, vFp := range s.verifyExtentFpAppend {
581
// Watermark returns the extent info of the given extent on the record.
582
func (s *ExtentStore) Watermark(extentID uint64) (ei *ExtentInfo, err error) {
585
ei, has = s.extentInfoMap[extentID]
588
err = fmt.Errorf("e %v not exist", s.getExtentKey(extentID))
594
// GetTinyExtentOffset returns the offset of the given extent.
595
func (s *ExtentStore) GetTinyExtentOffset(extentID uint64) (watermark int64, err error) {
596
einfo, err := s.Watermark(extentID)
600
watermark = int64(einfo.Size)
601
if watermark%util.PageSize != 0 {
602
watermark = watermark + (util.PageSize - watermark%util.PageSize)
608
// GetTinyExtentOffset returns the offset of the given extent.
609
func (s *ExtentStore) GetExtentSnapshotModOffset(extentID uint64, allocSize uint32) (watermark int64, err error) {
610
einfo, err := s.Watermark(extentID)
614
log.LogDebugf("action[ExtentStore.GetExtentSnapshotModOffset] extId %v SnapshotDataOff %v SnapPreAllocDataOff %v allocSize %v",
615
extentID, einfo.SnapshotDataOff, einfo.SnapPreAllocDataOff, allocSize)
617
if einfo.SnapPreAllocDataOff == 0 {
618
einfo.SnapPreAllocDataOff = einfo.SnapshotDataOff
620
watermark = int64(einfo.SnapPreAllocDataOff)
621
//if watermark%util.PageSize != 0 {
622
// watermark = watermark + (util.PageSize - watermark%util.PageSize)
624
einfo.SnapPreAllocDataOff += uint64(allocSize)
634
func (s *ExtentStore) GetStoreUsedSize() (used int64) {
635
extentInfoSlice := make([]*ExtentInfo, 0, s.GetExtentCount())
637
for _, extentID := range s.extentInfoMap {
638
extentInfoSlice = append(extentInfoSlice, extentID)
641
for _, einfo := range extentInfoSlice {
645
if IsTinyExtent(einfo.FileID) {
646
stat := new(syscall.Stat_t)
647
err := syscall.Stat(fmt.Sprintf("%v/%v", s.dataPath, einfo.FileID), stat)
651
used += stat.Blocks * DiskSectorSize
653
used += int64(einfo.Size + (einfo.SnapshotDataOff - util.ExtentSize))
659
// GetAllWatermarks returns all the watermarks.
660
func (s *ExtentStore) GetAllWatermarks(filter ExtentFilter) (extents []*ExtentInfo, tinyDeleteFileSize int64, err error) {
661
extents = make([]*ExtentInfo, 0, len(s.extentInfoMap))
662
extentInfoSlice := make([]*ExtentInfo, 0, len(s.extentInfoMap))
664
for _, extentID := range s.extentInfoMap {
665
extentInfoSlice = append(extentInfoSlice, extentID)
669
for _, extentInfo := range extentInfoSlice {
670
if filter != nil && !filter(extentInfo) {
673
if extentInfo.IsDeleted {
676
extents = append(extents, extentInfo)
678
tinyDeleteFileSize, err = s.LoadTinyDeleteFileOffset()
683
func (s *ExtentStore) getTinyExtentInfo() (extents []*ExtentInfo) {
684
extents = make([]*ExtentInfo, 0)
687
for extentID = TinyExtentStartID; extentID < TinyExtentCount+TinyExtentStartID; extentID++ {
688
ei := s.extentInfoMap[extentID]
692
extents = append(extents, ei)
699
// ExtentID return the extent ID.
700
func (s *ExtentStore) ExtentID(filename string) (extentID uint64, isExtent bool) {
701
if isExtent = RegexpExtentFile.MatchString(filename); !isExtent {
705
if extentID, err = strconv.ParseUint(filename, 10, 64); err != nil {
713
func (s *ExtentStore) initTinyExtent() (err error) {
714
s.availableTinyExtentC = make(chan uint64, TinyExtentCount)
715
s.brokenTinyExtentC = make(chan uint64, TinyExtentCount)
718
for extentID = TinyExtentStartID; extentID < TinyExtentStartID+TinyExtentCount; extentID++ {
719
err = s.Create(extentID)
720
if err == nil || strings.Contains(err.Error(), syscall.EEXIST.Error()) || err == ExtentExistsError {
722
s.brokenTinyExtentC <- extentID
723
s.brokenTinyExtentMap.Store(extentID, true)
732
// GetAvailableTinyExtent returns the available tiny extent from the channel.
733
func (s *ExtentStore) GetAvailableTinyExtent() (extentID uint64, err error) {
735
case extentID = <-s.availableTinyExtentC:
736
log.LogDebugf("dp %v GetAvailableTinyExtent. extentID %v", s.partitionID, extentID)
737
s.availableTinyExtentMap.Delete(extentID)
740
log.LogDebugf("dp %v GetAvailableTinyExtent not found", s.partitionID)
741
return 0, NoAvailableExtentError
746
// SendToAvailableTinyExtentC sends the extent to the channel that stores the available tiny extents.
747
func (s *ExtentStore) SendToAvailableTinyExtentC(extentID uint64) {
748
log.LogDebugf("dp %v action[SendToAvailableTinyExtentC] extentid %v", s.partitionID, extentID)
749
if _, ok := s.availableTinyExtentMap.Load(extentID); !ok {
750
log.LogDebugf("dp %v SendToAvailableTinyExtentC. extentID %v", s.partitionID, extentID)
751
s.availableTinyExtentC <- extentID
752
s.availableTinyExtentMap.Store(extentID, true)
754
log.LogDebugf("dp %v action[SendToAvailableTinyExtentC] extentid %v already exist", s.partitionID, extentID)
758
// SendAllToBrokenTinyExtentC sends all the extents to the channel that stores the broken extents.
759
func (s *ExtentStore) SendAllToBrokenTinyExtentC(extentIds []uint64) {
760
for _, extentID := range extentIds {
761
if _, ok := s.brokenTinyExtentMap.Load(extentID); !ok {
762
s.brokenTinyExtentC <- extentID
763
s.brokenTinyExtentMap.Store(extentID, true)
768
// AvailableTinyExtentCnt returns the count of the available tiny extents.
769
func (s *ExtentStore) AvailableTinyExtentCnt() int {
770
return len(s.availableTinyExtentC)
773
// BrokenTinyExtentCnt returns the count of the broken tiny extents.
774
func (s *ExtentStore) BrokenTinyExtentCnt() int {
775
return len(s.brokenTinyExtentC)
778
// MoveAllToBrokenTinyExtentC moves all the tiny extents to the channel stores the broken extents.
779
func (s *ExtentStore) MoveAllToBrokenTinyExtentC(cnt int) {
780
for i := 0; i < cnt; i++ {
781
extentID, err := s.GetAvailableTinyExtent()
785
s.SendToBrokenTinyExtentC(extentID)
789
// SendToBrokenTinyExtentC sends the given extent id to the channel.
790
func (s *ExtentStore) SendToBrokenTinyExtentC(extentID uint64) {
791
if _, ok := s.brokenTinyExtentMap.Load(extentID); !ok {
792
s.brokenTinyExtentC <- extentID
793
s.brokenTinyExtentMap.Store(extentID, true)
797
// GetBrokenTinyExtent returns the first broken extent in the channel.
798
func (s *ExtentStore) GetBrokenTinyExtent() (extentID uint64, err error) {
800
case extentID = <-s.brokenTinyExtentC:
801
s.brokenTinyExtentMap.Delete(extentID)
804
return 0, NoBrokenExtentError
809
// StoreSizeExtentID returns the size of the extent store
810
func (s *ExtentStore) StoreSizeExtentID(maxExtentID uint64) (totalSize uint64) {
811
extentInfos := make([]*ExtentInfo, 0)
813
for _, extentInfo := range s.extentInfoMap {
814
if extentInfo.FileID <= maxExtentID {
815
extentInfos = append(extentInfos, extentInfo)
819
for _, extentInfo := range extentInfos {
820
totalSize += extentInfo.TotalSize()
821
log.LogDebugf("ExtentStore.StoreSizeExtentID dp %v extentInfo %v totalSize %v", s.partitionID, extentInfo, extentInfo.TotalSize())
827
// StoreSizeExtentID returns the size of the extent store
828
func (s *ExtentStore) GetMaxExtentIDAndPartitionSize() (maxExtentID, totalSize uint64) {
829
extentInfos := make([]*ExtentInfo, 0)
831
for _, extentInfo := range s.extentInfoMap {
832
extentInfos = append(extentInfos, extentInfo)
835
for _, extentInfo := range extentInfos {
836
if extentInfo.FileID > maxExtentID {
837
maxExtentID = extentInfo.FileID
839
totalSize += extentInfo.TotalSize()
841
return maxExtentID, totalSize
844
func MarshalTinyExtent(extentID uint64, offset, size int64) (data []byte) {
845
data = make([]byte, DeleteTinyRecordSize)
846
binary.BigEndian.PutUint64(data[0:8], extentID)
847
binary.BigEndian.PutUint64(data[8:16], uint64(offset))
848
binary.BigEndian.PutUint64(data[16:DeleteTinyRecordSize], uint64(size))
852
func UnMarshalTinyExtent(data []byte) (extentID, offset, size uint64) {
853
extentID = binary.BigEndian.Uint64(data[0:8])
854
offset = binary.BigEndian.Uint64(data[8:16])
855
size = binary.BigEndian.Uint64(data[16:DeleteTinyRecordSize])
859
func (s *ExtentStore) RecordTinyDelete(extentID uint64, offset, size int64) (err error) {
860
record := MarshalTinyExtent(extentID, offset, size)
861
stat, err := s.tinyExtentDeleteFp.Stat()
865
if stat.Size()%DeleteTinyRecordSize != 0 {
866
needWriteEmpty := DeleteTinyRecordSize - (stat.Size() % DeleteTinyRecordSize)
867
data := make([]byte, needWriteEmpty)
868
s.tinyExtentDeleteFp.Write(data)
870
_, err = s.tinyExtentDeleteFp.Write(record)
878
func (s *ExtentStore) ReadTinyDeleteRecords(offset, size int64, data []byte) (crc uint32, err error) {
879
_, err = s.tinyExtentDeleteFp.ReadAt(data[:size], offset)
880
if err == nil || err == io.EOF {
882
crc = crc32.ChecksumIEEE(data[:size])
887
type ExtentDeleted struct {
888
ExtentID uint64 `json:"extentID"`
889
Offset uint64 `json:"offset"`
890
Size uint64 `json:"size"`
893
func (s *ExtentStore) GetHasDeleteTinyRecords() (extentDes []ExtentDeleted, err error) {
894
data := make([]byte, DeleteTinyRecordSize)
898
_, err = s.tinyExtentDeleteFp.ReadAt(data, offset)
906
extent := ExtentDeleted{}
907
extent.ExtentID, extent.Offset, extent.Size = UnMarshalTinyExtent(data)
908
extentDes = append(extentDes, extent)
909
offset += DeleteTinyRecordSize
913
// NextExtentID returns the next extentID. When the client sends the request to create an extent,
914
// this function generates an unique extentID within the current partition.
915
// This function can only be called by the leader.
916
func (s *ExtentStore) NextExtentID() (extentID uint64, err error) {
917
extentID = atomic.AddUint64(&s.baseExtentID, 1)
918
err = s.PersistenceBaseExtentID(extentID)
922
func (s *ExtentStore) LoadTinyDeleteFileOffset() (offset int64, err error) {
923
stat, err := s.tinyExtentDeleteFp.Stat()
930
func (s *ExtentStore) getExtentKey(extent uint64) string {
931
return fmt.Sprintf("extent %v_%v", s.partitionID, extent)
934
// UpdateBaseExtentID updates the base extent ID.
935
func (s *ExtentStore) UpdateBaseExtentID(id uint64) (err error) {
936
if IsTinyExtent(id) {
939
if id > atomic.LoadUint64(&s.baseExtentID) {
940
atomic.StoreUint64(&s.baseExtentID, id)
941
err = s.PersistenceBaseExtentID(atomic.LoadUint64(&s.baseExtentID))
943
s.PreAllocSpaceOnVerfiyFile(atomic.LoadUint64(&s.baseExtentID))
948
func (s *ExtentStore) extent(extentID uint64) (e *Extent, err error) {
949
if e, err = s.LoadExtentFromDisk(extentID, false); err != nil {
950
err = fmt.Errorf("load extent from disk: %v", err)
956
func (s *ExtentStore) extentWithHeader(ei *ExtentInfo) (e *Extent, err error) {
958
if ei == nil || ei.IsDeleted {
959
err = ExtentNotFoundError
962
if e, ok = s.cache.Get(ei.FileID); !ok {
963
if e, err = s.LoadExtentFromDisk(ei.FileID, true); err != nil {
964
err = fmt.Errorf("load %v from disk: %v", s.getExtentKey(ei.FileID), err)
971
func (s *ExtentStore) extentWithHeaderByExtentID(extentID uint64) (e *Extent, err error) {
973
if e, ok = s.cache.Get(extentID); !ok {
974
if e, err = s.LoadExtentFromDisk(extentID, true); err != nil {
975
err = fmt.Errorf("load %v from disk: %v", s.getExtentKey(extentID), err)
982
// HasExtent tells if the extent store has the extent with the given ID
983
func (s *ExtentStore) HasExtent(extentID uint64) (exist bool) {
985
defer s.eiMutex.RUnlock()
986
_, exist = s.extentInfoMap[extentID]
990
// GetExtentCount returns the number of extents in the extentInfoMap
991
func (s *ExtentStore) GetExtentCount() (count int) {
993
defer s.eiMutex.RUnlock()
994
return len(s.extentInfoMap)
997
func (s *ExtentStore) LoadExtentFromDisk(extentID uint64, putCache bool) (e *Extent, err error) {
998
name := path.Join(s.dataPath, fmt.Sprintf("%v", extentID))
999
e = NewExtentInCore(name, extentID)
1000
if err = e.RestoreFromFS(); err != nil {
1001
err = fmt.Errorf("restore from file %v putCache %v system: %v", name, putCache, err)
1009
if !IsTinyExtent(extentID) && proto.IsNormalDp(s.partitionType) {
1010
e.header = make([]byte, util.BlockHeaderSize)
1011
if _, err = s.verifyExtentFp.ReadAt(e.header, int64(extentID*util.BlockHeaderSize)); err != nil && err != io.EOF {
1014
emptyHeader := make([]byte, util.BlockHeaderSize)
1015
log.LogDebugf("LoadExtentFromDisk. partition id %v extentId %v, snapshotOff %v, append fp cnt %v",
1016
s.partitionID, extentID, e.snapshotDataOff, len(s.verifyExtentFpAppend))
1017
if e.snapshotDataOff > util.ExtentSize {
1018
for id, vFp := range s.verifyExtentFpAppend {
1019
if uint64(id) > (e.snapshotDataOff-util.ExtentSize)/util.ExtentSize {
1020
log.LogDebugf("LoadExtentFromDisk. partition id %v extentId %v, snapshotOff %v id %v out of extent range",
1021
s.partitionID, extentID, e.snapshotDataOff, id)
1024
log.LogDebugf("LoadExtentFromDisk. partition id %v extentId %v, snapshotOff %v id %v", s.partitionID, extentID, e.snapshotDataOff, id)
1025
header := make([]byte, util.BlockHeaderSize)
1026
if _, err = vFp.ReadAt(header, int64(extentID*util.BlockHeaderSize)); err != nil && err != io.EOF {
1027
log.LogDebugf("LoadExtentFromDisk. partition id %v extentId %v, read at %v err %v",
1028
s.partitionID, extentID, extentID*util.BlockHeaderSize, err)
1031
if bytes.Equal(emptyHeader, header) {
1032
log.LogErrorf("LoadExtentFromDisk. partition id %v extent %v hole at id %v", s.partitionID, e, id)
1034
e.header = append(e.header, header...)
1036
if len(s.verifyExtentFpAppend) < int(e.snapshotDataOff-1)/util.ExtentSize {
1037
log.LogErrorf("LoadExtentFromDisk. extent %v need fp %v out of range %v", e, int(e.snapshotDataOff-1)/util.ExtentSize, len(s.verifyExtentFpAppend))
1048
func (s *ExtentStore) ScanBlocks(extentID uint64) (bcs []*BlockCrc, err error) {
1049
if !proto.IsNormalDp(s.partitionType) {
1054
bcs = make([]*BlockCrc, 0)
1055
ei := s.extentInfoMap[extentID]
1056
e, err := s.extentWithHeader(ei)
1062
if e.snapshotDataOff > util.ExtentSize {
1063
extSize = int64(e.snapshotDataOff)
1065
blockCnt = int(extSize / util.BlockSize)
1067
if e.Size()%util.BlockSize != 0 {
1070
for blockNo := 0; blockNo < blockCnt; blockNo++ {
1071
blockCrc := binary.BigEndian.Uint32(e.header[blockNo*util.PerBlockCrcSize : (blockNo+1)*util.PerBlockCrcSize])
1072
bcs = append(bcs, &BlockCrc{BlockNo: blockNo, Crc: blockCrc})
1074
sort.Sort(BlockCrcArr(bcs))
1079
type ExtentInfoArr []*ExtentInfo
1081
func (arr ExtentInfoArr) Len() int { return len(arr) }
1082
func (arr ExtentInfoArr) Less(i, j int) bool { return arr[i].FileID < arr[j].FileID }
1083
func (arr ExtentInfoArr) Swap(i, j int) { arr[i], arr[j] = arr[j], arr[i] }
1085
func (s *ExtentStore) BackendTask() {
1086
s.autoComputeExtentCrc()
1087
s.cleanExpiredNormalExtentDeleteCache()
1090
func (s *ExtentStore) cleanExpiredNormalExtentDeleteCache() {
1091
s.hasDeleteNormalExtentsCache.Range(func(key, value interface{}) bool {
1092
deleteTime := value.(int64)
1093
extentID := key.(uint64)
1094
if time.Now().Unix()-deleteTime > NormalExtentDeleteRetainTime {
1095
s.hasDeleteNormalExtentsCache.Delete(extentID)
1101
func (s *ExtentStore) autoComputeExtentCrc() {
1102
if !proto.IsNormalDp(s.partitionType) {
1107
if r := recover(); r != nil {
1112
extentInfos := make([]*ExtentInfo, 0)
1113
deleteExtents := make([]*ExtentInfo, 0)
1115
for _, ei := range s.extentInfoMap {
1116
extentInfos = append(extentInfos, ei)
1117
if ei.IsDeleted && time.Now().Unix()-ei.ModifyTime > UpdateCrcInterval {
1118
deleteExtents = append(deleteExtents, ei)
1123
if len(deleteExtents) > 0 {
1125
for _, ei := range deleteExtents {
1126
delete(s.extentInfoMap, ei.FileID)
1131
sort.Sort(ExtentInfoArr(extentInfos))
1133
for _, ei := range extentInfos {
1134
s.ApplyIdMutex.RLock()
1136
s.ApplyIdMutex.RUnlock()
1140
if !IsTinyExtent(ei.FileID) && time.Now().Unix()-ei.ModifyTime > UpdateCrcInterval &&
1141
!ei.IsDeleted && ei.Size > 0 && ei.Crc == 0 {
1143
e, err := s.extentWithHeader(ei)
1145
log.LogError("[autoComputeExtentCrc] get extent error", err)
1146
s.ApplyIdMutex.RUnlock()
1150
extentCrc, err := e.autoComputeExtentCrc(s.PersistenceBlockCrc)
1152
log.LogError("[autoComputeExtentCrc] compute crc fail", err)
1153
s.ApplyIdMutex.RUnlock()
1157
ei.UpdateExtentInfo(e, extentCrc)
1158
ei.ApplyID = s.ApplyId
1159
time.Sleep(time.Millisecond * 100)
1161
s.ApplyIdMutex.RUnlock()
1164
time.Sleep(time.Second)
1167
func (s *ExtentStore) TinyExtentRecover(extentID uint64, offset, size int64, data []byte, crc uint32, isEmptyPacket bool) (err error) {
1168
if !IsTinyExtent(extentID) {
1169
return fmt.Errorf("extent %v not tinyExtent", extentID)
1178
ei = s.extentInfoMap[extentID]
1180
if e, err = s.extentWithHeader(ei); err != nil {
1184
if err = e.TinyExtentRecover(data, offset, size, crc, isEmptyPacket); err != nil {
1187
ei.UpdateExtentInfo(e, 0)
1192
func (s *ExtentStore) TinyExtentGetFinfoSize(extentID uint64) (size uint64, err error) {
1194
if !IsTinyExtent(extentID) {
1195
return 0, fmt.Errorf("unavali extent id (%v)", extentID)
1198
ei := s.extentInfoMap[extentID]
1200
if e, err = s.extentWithHeader(ei); err != nil {
1204
finfo, err := e.file.Stat()
1208
size = uint64(finfo.Size())
1213
func (s *ExtentStore) TinyExtentAvaliOffset(extentID uint64, offset int64) (newOffset, newEnd int64, err error) {
1215
if !IsTinyExtent(extentID) {
1216
return 0, 0, fmt.Errorf("unavali extent(%v)", extentID)
1219
ei := s.extentInfoMap[extentID]
1221
if e, err = s.extentWithHeader(ei); err != nil {
1226
if err != nil && strings.Contains(err.Error(), syscall.ENXIO.Error()) {
1227
newOffset = e.dataSize
1232
newOffset, newEnd, err = e.tinyExtentAvaliOffset(offset)
1237
func (s *ExtentStore) renameStaleExtentStore() (err error) {
1238
// create: move current folder to .old and create a new folder
1239
if _, err = os.Stat(s.dataPath); err != nil {
1240
if os.IsNotExist(err) {
1245
curTime := time.Now().Format(StaleExtStoreTimeFormat)
1246
staleExtStoreDirName := s.dataPath + "_" + curTime + StaleExtStoreBackupSuffix
1248
if err = os.Rename(s.dataPath, staleExtStoreDirName); err != nil {