cubefs

Форк
0
/
extent_store.go 
1252 строки · 35.7 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package storage
16

17
import (
18
	"bytes"
19
	"encoding/binary"
20
	"fmt"
21
	"hash/crc32"
22
	"io"
23
	"os"
24
	"path"
25
	"regexp"
26
	"runtime"
27
	"sort"
28
	"strconv"
29
	"strings"
30
	"sync"
31
	"sync/atomic"
32
	"syscall"
33
	"time"
34

35
	"github.com/cubefs/cubefs/proto"
36
	"github.com/cubefs/cubefs/util"
37
	"github.com/cubefs/cubefs/util/errors"
38
	"github.com/cubefs/cubefs/util/log"
39
)
40

41
//TODO: remove this later.
42
//go:generate golangci-lint run --issues-exit-code=1 -D errcheck -E bodyclose ./...
43

44
const (
45
	ExtCrcHeaderFileName     = "EXTENT_CRC"
46
	ExtBaseExtentIDFileName  = "EXTENT_META"
47
	TinyDeleteFileOpt        = os.O_CREATE | os.O_RDWR | os.O_APPEND
48
	TinyExtDeletedFileName   = "TINYEXTENT_DELETE"
49
	NormalExtDeletedFileName = "NORMALEXTENT_DELETE"
50
	MaxExtentCount           = 20000
51
	TinyExtentCount          = 64
52
	TinyExtentStartID        = 1
53
	MinExtentID              = 1024
54
	DeleteTinyRecordSize     = 24
55
	UpdateCrcInterval        = 600
56
	RepairInterval           = 60
57
	RandomWriteType          = 2
58
	AppendWriteType          = 1
59
	AppendRandomWriteType    = 4
60

61
	NormalExtentDeleteRetainTime = 3600 * 4
62

63
	StaleExtStoreBackupSuffix = ".old"
64
	StaleExtStoreTimeFormat   = "20060102150405.000000000"
65
)
66

67
var (
68
	RegexpExtentFile, _ = regexp.Compile(`^(\d)+$`)
69
	SnapShotFilePool    = &sync.Pool{New: func() interface{} {
70
		return new(proto.File)
71
	}}
72
)
73

74
func GetSnapShotFileFromPool() (f *proto.File) {
75
	f = SnapShotFilePool.Get().(*proto.File)
76
	return
77
}
78

79
func PutSnapShotFileToPool(f *proto.File) {
80
	SnapShotFilePool.Put(f)
81
}
82

83
type ExtentFilter func(info *ExtentInfo) bool
84

85
// Filters
86
var (
87
	NormalExtentFilter = func() ExtentFilter {
88
		now := time.Now()
89
		return func(ei *ExtentInfo) bool {
90
			return !IsTinyExtent(ei.FileID) && now.Unix()-ei.ModifyTime > RepairInterval && !ei.IsDeleted
91
		}
92
	}
93

94
	TinyExtentFilter = func(filters []uint64) ExtentFilter {
95
		return func(ei *ExtentInfo) bool {
96
			if !IsTinyExtent(ei.FileID) {
97
				return false
98
			}
99
			for _, filterID := range filters {
100
				if filterID == ei.FileID {
101
					return true
102
				}
103
			}
104
			return false
105
		}
106
	}
107
)
108

109
// ExtentStore defines fields used in the storage engine.
110
// Packets smaller than 128K are stored in the "tinyExtent", a place to persist the small files.
111
// packets larger than or equal to 128K are stored in the normal "extent", a place to persist large files.
112
// The difference between them is that the extentID of a tinyExtent starts at 5000000 and ends at 5000128.
113
// Multiple small files can be appended to the same tinyExtent.
114
// In addition, the deletion of small files is implemented by the punch hole from the underlying file system.
115
type ExtentStore struct {
116
	dataPath               string
117
	baseExtentID           uint64                 // TODO what is baseExtentID
118
	extentInfoMap          map[uint64]*ExtentInfo // map that stores all the extent information
119
	eiMutex                sync.RWMutex           // mutex for extent info
120
	cache                  *ExtentCache           // extent cache
121
	mutex                  sync.Mutex
122
	storeSize              int      // size of the extent store
123
	metadataFp             *os.File // metadata file pointer?
124
	tinyExtentDeleteFp     *os.File
125
	normalExtentDeleteFp   *os.File
126
	closeC                 chan bool
127
	closed                 bool
128
	availableTinyExtentC   chan uint64 // available tinyExtent channel
129
	availableTinyExtentMap sync.Map
130
	brokenTinyExtentC      chan uint64 // broken tinyExtent channel
131
	brokenTinyExtentMap    sync.Map
132
	// blockSize                         int
133
	partitionID    uint64
134
	verifyExtentFp *os.File
135

136
	verifyExtentFpAppend              []*os.File
137
	hasAllocSpaceExtentIDOnVerfiyFile uint64
138
	hasDeleteNormalExtentsCache       sync.Map
139
	partitionType                     int
140
	ApplyId                           uint64
141
	ApplyIdMutex                      sync.RWMutex
142
}
143

144
func MkdirAll(name string) (err error) {
145
	return os.MkdirAll(name, 0o755)
146
}
147

148
func NewExtentStore(dataDir string, partitionID uint64, storeSize, dpType int, isCreate bool) (s *ExtentStore, err error) {
149
	s = new(ExtentStore)
150
	s.dataPath = dataDir
151
	s.partitionType = dpType
152
	s.partitionID = partitionID
153

154
	if isCreate {
155
		if err = s.renameStaleExtentStore(); err != nil {
156
			return
157
		}
158
		if err = MkdirAll(dataDir); err != nil {
159
			return nil, fmt.Errorf("NewExtentStore [%v] err[%v]", dataDir, err)
160
		}
161

162
		if s.tinyExtentDeleteFp, err = os.OpenFile(path.Join(s.dataPath, TinyExtDeletedFileName), TinyDeleteFileOpt, 0o666); err != nil {
163
			return
164
		}
165
		if s.verifyExtentFp, err = os.OpenFile(path.Join(s.dataPath, ExtCrcHeaderFileName), os.O_CREATE|os.O_RDWR, 0o666); err != nil {
166
			return
167
		}
168
		if s.metadataFp, err = os.OpenFile(path.Join(s.dataPath, ExtBaseExtentIDFileName), os.O_CREATE|os.O_RDWR, 0o666); err != nil {
169
			return
170
		}
171
		if s.normalExtentDeleteFp, err = os.OpenFile(path.Join(s.dataPath, NormalExtDeletedFileName), os.O_CREATE|os.O_RDWR|os.O_APPEND, 0o666); err != nil {
172
			return
173
		}
174
	} else {
175
		if err = MkdirAll(dataDir); err != nil {
176
			return nil, fmt.Errorf("NewExtentStore [%v] err[%v]", dataDir, err)
177
		}
178
		if s.tinyExtentDeleteFp, err = os.OpenFile(path.Join(s.dataPath, TinyExtDeletedFileName), os.O_RDWR|os.O_APPEND, 0o666); err != nil {
179
			return
180
		}
181
		if s.verifyExtentFp, err = os.OpenFile(path.Join(s.dataPath, ExtCrcHeaderFileName), os.O_RDWR, 0o666); err != nil {
182
			return
183
		}
184
		if s.metadataFp, err = os.OpenFile(path.Join(s.dataPath, ExtBaseExtentIDFileName), os.O_RDWR, 0o666); err != nil {
185
			return
186
		}
187
		if s.normalExtentDeleteFp, err = os.OpenFile(path.Join(s.dataPath, NormalExtDeletedFileName), os.O_RDWR|os.O_APPEND, 0o666); err != nil {
188
			return
189
		}
190
	}
191

192
	stat, err := s.tinyExtentDeleteFp.Stat()
193
	if err != nil {
194
		return
195
	}
196
	if stat.Size()%DeleteTinyRecordSize != 0 {
197
		needWriteEmpty := DeleteTinyRecordSize - (stat.Size() % DeleteTinyRecordSize)
198
		data := make([]byte, needWriteEmpty)
199
		s.tinyExtentDeleteFp.Write(data)
200
	}
201

202
	log.LogDebugf("NewExtentStore.partitionID [%v] dataPath %v verifyExtentFp init", partitionID, s.dataPath)
203
	if s.verifyExtentFp, err = os.OpenFile(path.Join(s.dataPath, ExtCrcHeaderFileName), os.O_CREATE|os.O_RDWR, 0o666); err != nil {
204
		return
205
	}
206

207
	aId := 0
208
	var vFp *os.File
209
	for {
210
		dataPath := path.Join(s.dataPath, ExtCrcHeaderFileName+"_"+strconv.Itoa(aId))
211
		if _, err = os.Stat(dataPath); err != nil {
212
			log.LogDebugf("NewExtentStore. partitionID [%v] dataPath not exist err %v. verifyExtentFpAppend init return", partitionID, err)
213
			break
214
		}
215
		if vFp, err = os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR, 0o666); err != nil {
216
			log.LogErrorf("NewExtentStore. partitionID [%v] dataPath exist but open err %v. verifyExtentFpAppend init return", partitionID, err)
217
			return
218
		}
219
		log.LogDebugf("NewExtentStore. partitionID [%v] dataPath exist and opened id %v", partitionID, aId)
220
		s.verifyExtentFpAppend = append(s.verifyExtentFpAppend, vFp)
221
		aId++
222
	}
223
	if s.metadataFp, err = os.OpenFile(path.Join(s.dataPath, ExtBaseExtentIDFileName), os.O_CREATE|os.O_RDWR, 0o666); err != nil {
224
		return
225
	}
226
	if s.normalExtentDeleteFp, err = os.OpenFile(path.Join(s.dataPath, NormalExtDeletedFileName), os.O_CREATE|os.O_RDWR|os.O_APPEND, 0o666); err != nil {
227
		return
228
	}
229

230
	s.extentInfoMap = make(map[uint64]*ExtentInfo)
231
	s.cache = NewExtentCache(100)
232
	if err = s.initBaseFileID(); err != nil {
233
		err = fmt.Errorf("init base field ID: %v", err)
234
		return
235
	}
236
	s.hasAllocSpaceExtentIDOnVerfiyFile = s.GetPreAllocSpaceExtentIDOnVerifyFile()
237
	s.storeSize = storeSize
238
	s.closeC = make(chan bool, 1)
239
	s.closed = false
240
	err = s.initTinyExtent()
241
	if err != nil {
242
		return
243
	}
244
	return
245
}
246

247
func (ei *ExtentInfo) UpdateExtentInfo(extent *Extent, crc uint32) {
248
	extent.Lock()
249
	defer extent.Unlock()
250

251
	if time.Now().Unix()-extent.ModifyTime() <= UpdateCrcInterval {
252
		crc = 0
253
	}
254

255
	ei.Size = uint64(extent.dataSize)
256
	ei.SnapshotDataOff = extent.snapshotDataOff
257

258
	log.LogInfof("action[ExtentInfo.UpdateExtentInfo] ei info [%v]", ei.String())
259

260
	if !IsTinyExtent(ei.FileID) {
261
		atomic.StoreUint32(&ei.Crc, crc)
262
		ei.ModifyTime = extent.ModifyTime()
263
	}
264
}
265

266
// SnapShot returns the information of all the extents on the current data partition.
267
// When the master sends the loadDataPartition request, the snapshot is used to compare the replicas.
268
func (s *ExtentStore) SnapShot() (files []*proto.File, err error) {
269
	var normalExtentSnapshot, tinyExtentSnapshot []*ExtentInfo
270

271
	// compute crc again to guarantee crc and applyID is the newest
272
	s.autoComputeExtentCrc()
273

274
	if normalExtentSnapshot, _, err = s.GetAllWatermarks(NormalExtentFilter()); err != nil {
275
		log.LogErrorf("SnapShot GetAllWatermarks err %v", err)
276
		return
277
	}
278

279
	files = make([]*proto.File, 0, len(normalExtentSnapshot))
280
	for _, ei := range normalExtentSnapshot {
281
		file := GetSnapShotFileFromPool()
282
		file.Name = strconv.FormatUint(ei.FileID, 10)
283
		file.Size = uint32(ei.Size)
284
		file.Modified = ei.ModifyTime
285
		file.Crc = atomic.LoadUint32(&ei.Crc)
286
		file.ApplyID = ei.ApplyID
287
		log.LogDebugf("partitionID %v ExtentStore set applyid %v partition %v", s.partitionID, s.ApplyId, s.partitionID)
288
		files = append(files, file)
289
	}
290
	tinyExtentSnapshot = s.getTinyExtentInfo()
291
	for _, ei := range tinyExtentSnapshot {
292
		file := GetSnapShotFileFromPool()
293
		file.Name = strconv.FormatUint(ei.FileID, 10)
294
		file.Size = uint32(ei.Size)
295
		file.Modified = ei.ModifyTime
296
		file.Crc = 0
297
		files = append(files, file)
298
	}
299

300
	return
301
}
302

303
// Create creates an extent.
304
func (s *ExtentStore) Create(extentID uint64) (err error) {
305
	var e *Extent
306
	name := path.Join(s.dataPath, strconv.Itoa(int(extentID)))
307
	if s.HasExtent(extentID) {
308
		err = ExtentExistsError
309
		return err
310
	}
311

312
	e = NewExtentInCore(name, extentID)
313
	e.header = make([]byte, util.BlockHeaderSize)
314
	err = e.InitToFS()
315
	if err != nil {
316
		return err
317
	}
318

319
	s.cache.Put(e)
320
	extInfo := &ExtentInfo{FileID: extentID}
321
	extInfo.UpdateExtentInfo(e, 0)
322

323
	atomic.StoreInt64(&extInfo.AccessTime, e.accessTime)
324
	s.eiMutex.Lock()
325
	s.extentInfoMap[extentID] = extInfo
326
	s.eiMutex.Unlock()
327

328
	s.UpdateBaseExtentID(extentID)
329
	return
330
}
331

332
func (s *ExtentStore) initBaseFileID() error {
333
	var baseFileID uint64
334
	baseFileID, _ = s.GetPersistenceBaseExtentID()
335
	files, err := os.ReadDir(s.dataPath)
336
	if err != nil {
337
		return err
338
	}
339

340
	var (
341
		extentID uint64
342
		isExtent bool
343
		e        *Extent
344
		ei       *ExtentInfo
345
		loadErr  error
346
	)
347
	for _, f := range files {
348
		if extentID, isExtent = s.ExtentID(f.Name()); !isExtent {
349
			continue
350
		}
351

352
		if e, loadErr = s.extent(extentID); loadErr != nil {
353
			log.LogError("[initBaseFileID] load extent error", loadErr)
354
			continue
355
		}
356

357
		ei = &ExtentInfo{FileID: extentID}
358
		ei.UpdateExtentInfo(e, 0)
359
		atomic.StoreInt64(&ei.AccessTime, e.accessTime)
360

361
		s.eiMutex.Lock()
362
		s.extentInfoMap[extentID] = ei
363
		s.eiMutex.Unlock()
364

365
		e.Close()
366
		if !IsTinyExtent(extentID) && extentID > baseFileID {
367
			baseFileID = extentID
368
		}
369
	}
370
	if baseFileID < MinExtentID {
371
		baseFileID = MinExtentID
372
	}
373
	atomic.StoreUint64(&s.baseExtentID, baseFileID)
374
	log.LogInfof("datadir(%v) maxBaseId(%v)", s.dataPath, baseFileID)
375
	runtime.GC()
376
	return nil
377
}
378

379
// Write writes the given extent to the disk.
380
func (s *ExtentStore) Write(extentID uint64, offset, size int64, data []byte, crc uint32, writeType int, isSync bool) (status uint8, err error) {
381
	var (
382
		e  *Extent
383
		ei *ExtentInfo
384
	)
385
	status = proto.OpOk
386
	s.eiMutex.Lock()
387
	ei = s.extentInfoMap[extentID]
388
	e, err = s.extentWithHeader(ei)
389
	s.eiMutex.Unlock()
390
	if err != nil {
391
		return status, err
392
	}
393
	// update access time
394
	atomic.StoreInt64(&ei.AccessTime, time.Now().Unix())
395
	log.LogDebugf("action[Write] dp %v extentID %v offset %v size %v writeTYPE %v", s.partitionID, extentID, offset, size, writeType)
396
	if err = s.checkOffsetAndSize(extentID, offset, size, writeType); err != nil {
397
		log.LogInfof("action[Write] path %v err %v", e.filePath, err)
398
		return status, err
399
	}
400

401
	status, err = e.Write(data, offset, size, crc, writeType, isSync, s.PersistenceBlockCrc, ei)
402
	if err != nil {
403
		log.LogInfof("action[Write] path %v err %v", e.filePath, err)
404
		return status, err
405
	}
406

407
	ei.UpdateExtentInfo(e, 0)
408
	return status, nil
409
}
410

411
func (s *ExtentStore) checkOffsetAndSize(extentID uint64, offset, size int64, writeType int) error {
412
	if IsTinyExtent(extentID) {
413
		return nil
414
	}
415
	// random write pos can happen on modAppend partition of extent
416
	if writeType == RandomWriteType {
417
		return nil
418
	}
419
	if writeType == AppendRandomWriteType {
420
		if offset < util.ExtentSize {
421
			return newParameterError("writeType=%d offset=%d size=%d", writeType, offset, size)
422
		}
423
		return nil
424
	}
425
	if size == 0 || size > util.BlockSize ||
426
		offset >= util.BlockCount*util.BlockSize ||
427
		offset+size > util.BlockCount*util.BlockSize {
428
		return newParameterError("offset=%d size=%d", offset, size)
429
	}
430
	return nil
431
}
432

433
// IsTinyExtent checks if the given extent is tiny extent.
434
func IsTinyExtent(extentID uint64) bool {
435
	return extentID >= TinyExtentStartID && extentID < TinyExtentStartID+TinyExtentCount
436
}
437

438
// Read reads the extent based on the given id.
439
func (s *ExtentStore) Read(extentID uint64, offset, size int64, nbuf []byte, isRepairRead bool) (crc uint32, err error) {
440
	var e *Extent
441
	s.eiMutex.RLock()
442
	ei := s.extentInfoMap[extentID]
443
	s.eiMutex.RUnlock()
444

445
	if ei == nil {
446
		return 0, errors.Trace(ExtentHasBeenDeletedError, "[Read] extent[%d] is already been deleted", extentID)
447
	}
448

449
	// update extent access time
450
	atomic.StoreInt64(&ei.AccessTime, time.Now().Unix())
451

452
	if e, err = s.extentWithHeader(ei); err != nil {
453
		return
454
	}
455

456
	//if err = s.checkOffsetAndSize(extentID, offset, size); err != nil {
457
	//	return
458
	//}
459
	crc, err = e.Read(nbuf, offset, size, isRepairRead)
460

461
	return
462
}
463

464
func (s *ExtentStore) DumpExtents() (extInfos SortedExtentInfos) {
465
	s.eiMutex.RLock()
466
	for _, v := range s.extentInfoMap {
467
		extInfos = append(extInfos, v)
468
	}
469
	s.eiMutex.RUnlock()
470
	return
471
}
472

473
func (s *ExtentStore) punchDelete(extentID uint64, offset, size int64) (err error) {
474
	e, err := s.extentWithHeaderByExtentID(extentID)
475
	if err != nil {
476
		return nil
477
	}
478
	if offset+size > e.dataSize {
479
		return
480
	}
481
	var hasDelete bool
482
	if hasDelete, err = e.punchDelete(offset, size); err != nil {
483
		return
484
	}
485
	if hasDelete {
486
		return
487
	}
488
	if err = s.RecordTinyDelete(e.extentID, offset, size); err != nil {
489
		return
490
	}
491
	return
492
}
493

494
// MarkDelete marks the given extent as deleted.
495
func (s *ExtentStore) MarkDelete(extentID uint64, offset, size int64) (err error) {
496
	var ei *ExtentInfo
497
	s.eiMutex.RLock()
498
	ei = s.extentInfoMap[extentID]
499
	s.eiMutex.RUnlock()
500
	if ei == nil || ei.IsDeleted {
501
		return
502
	}
503
	log.LogDebugf("action[MarkDelete] extentID %v offset %v size %v ei(size %v snapshotSize %v)",
504
		extentID, offset, size, ei.Size, ei.SnapshotDataOff)
505

506
	funcNeedPunchDel := func() bool {
507
		return offset != 0 || (size != 0 && ((ei.Size != uint64(size) && ei.SnapshotDataOff == util.ExtentSize) ||
508
			(ei.SnapshotDataOff != uint64(size) && ei.SnapshotDataOff > util.ExtentSize)))
509
	}
510

511
	if IsTinyExtent(extentID) || funcNeedPunchDel() {
512
		log.LogDebugf("action[MarkDelete] extentID %v offset %v size %v ei(size %v snapshotSize %v)",
513
			extentID, offset, size, ei.Size, ei.SnapshotDataOff)
514
		return s.punchDelete(extentID, offset, size)
515
	}
516

517
	extentFilePath := path.Join(s.dataPath, strconv.FormatUint(extentID, 10))
518
	log.LogDebugf("action[MarkDelete] extentID %v offset %v size %v ei(size %v extentFilePath %v)",
519
		extentID, offset, size, ei.Size, extentFilePath)
520
	if err = os.Remove(extentFilePath); err != nil && !os.IsNotExist(err) {
521
		// NOTE: if remove failed
522
		// we meet a disk error
523
		err = BrokenDiskError
524
		return
525
	}
526
	if err = s.PersistenceHasDeleteExtent(extentID); err != nil {
527
		err = BrokenDiskError
528
		return
529
	}
530
	ei.IsDeleted = true
531
	ei.ModifyTime = time.Now().Unix()
532
	s.cache.Del(extentID)
533
	if err = s.DeleteBlockCrc(extentID); err != nil {
534
		err = BrokenDiskError
535
		return
536
	}
537
	s.PutNormalExtentToDeleteCache(extentID)
538

539
	s.eiMutex.Lock()
540
	delete(s.extentInfoMap, extentID)
541
	s.eiMutex.Unlock()
542

543
	return
544
}
545

546
func (s *ExtentStore) PutNormalExtentToDeleteCache(extentID uint64) {
547
	s.hasDeleteNormalExtentsCache.Store(extentID, time.Now().Unix())
548
}
549

550
func (s *ExtentStore) IsDeletedNormalExtent(extentID uint64) (ok bool) {
551
	_, ok = s.hasDeleteNormalExtentsCache.Load(extentID)
552
	return
553
}
554

555
// Close closes the extent store.
556
func (s *ExtentStore) Close() {
557
	s.mutex.Lock()
558
	defer s.mutex.Unlock()
559
	if s.closed {
560
		return
561
	}
562

563
	// Release cache
564
	s.cache.Flush()
565
	s.cache.Clear()
566
	s.tinyExtentDeleteFp.Sync()
567
	s.tinyExtentDeleteFp.Close()
568
	s.normalExtentDeleteFp.Sync()
569
	s.normalExtentDeleteFp.Close()
570
	s.verifyExtentFp.Sync()
571
	s.verifyExtentFp.Close()
572
	for _, vFp := range s.verifyExtentFpAppend {
573
		if vFp != nil {
574
			vFp.Sync()
575
			vFp.Close()
576
		}
577
	}
578
	s.closed = true
579
}
580

581
// Watermark returns the extent info of the given extent on the record.
582
func (s *ExtentStore) Watermark(extentID uint64) (ei *ExtentInfo, err error) {
583
	var has bool
584
	s.eiMutex.RLock()
585
	ei, has = s.extentInfoMap[extentID]
586
	s.eiMutex.RUnlock()
587
	if !has {
588
		err = fmt.Errorf("e %v not exist", s.getExtentKey(extentID))
589
		return
590
	}
591
	return
592
}
593

594
// GetTinyExtentOffset returns the offset of the given extent.
595
func (s *ExtentStore) GetTinyExtentOffset(extentID uint64) (watermark int64, err error) {
596
	einfo, err := s.Watermark(extentID)
597
	if err != nil {
598
		return
599
	}
600
	watermark = int64(einfo.Size)
601
	if watermark%util.PageSize != 0 {
602
		watermark = watermark + (util.PageSize - watermark%util.PageSize)
603
	}
604

605
	return
606
}
607

608
// GetTinyExtentOffset returns the offset of the given extent.
609
func (s *ExtentStore) GetExtentSnapshotModOffset(extentID uint64, allocSize uint32) (watermark int64, err error) {
610
	einfo, err := s.Watermark(extentID)
611
	if err != nil {
612
		return
613
	}
614
	log.LogDebugf("action[ExtentStore.GetExtentSnapshotModOffset] extId %v SnapshotDataOff %v SnapPreAllocDataOff %v allocSize %v",
615
		extentID, einfo.SnapshotDataOff, einfo.SnapPreAllocDataOff, allocSize)
616

617
	if einfo.SnapPreAllocDataOff == 0 {
618
		einfo.SnapPreAllocDataOff = einfo.SnapshotDataOff
619
	}
620
	watermark = int64(einfo.SnapPreAllocDataOff)
621
	//if watermark%util.PageSize != 0 {
622
	//	watermark = watermark + (util.PageSize - watermark%util.PageSize)
623
	//}
624
	einfo.SnapPreAllocDataOff += uint64(allocSize)
625

626
	return
627
}
628

629
// Sector size
630
const (
631
	DiskSectorSize = 512
632
)
633

634
func (s *ExtentStore) GetStoreUsedSize() (used int64) {
635
	extentInfoSlice := make([]*ExtentInfo, 0, s.GetExtentCount())
636
	s.eiMutex.RLock()
637
	for _, extentID := range s.extentInfoMap {
638
		extentInfoSlice = append(extentInfoSlice, extentID)
639
	}
640
	s.eiMutex.RUnlock()
641
	for _, einfo := range extentInfoSlice {
642
		if einfo.IsDeleted {
643
			continue
644
		}
645
		if IsTinyExtent(einfo.FileID) {
646
			stat := new(syscall.Stat_t)
647
			err := syscall.Stat(fmt.Sprintf("%v/%v", s.dataPath, einfo.FileID), stat)
648
			if err != nil {
649
				continue
650
			}
651
			used += stat.Blocks * DiskSectorSize
652
		} else {
653
			used += int64(einfo.Size + (einfo.SnapshotDataOff - util.ExtentSize))
654
		}
655
	}
656
	return
657
}
658

659
// GetAllWatermarks returns all the watermarks.
660
func (s *ExtentStore) GetAllWatermarks(filter ExtentFilter) (extents []*ExtentInfo, tinyDeleteFileSize int64, err error) {
661
	extents = make([]*ExtentInfo, 0, len(s.extentInfoMap))
662
	extentInfoSlice := make([]*ExtentInfo, 0, len(s.extentInfoMap))
663
	s.eiMutex.RLock()
664
	for _, extentID := range s.extentInfoMap {
665
		extentInfoSlice = append(extentInfoSlice, extentID)
666
	}
667
	s.eiMutex.RUnlock()
668

669
	for _, extentInfo := range extentInfoSlice {
670
		if filter != nil && !filter(extentInfo) {
671
			continue
672
		}
673
		if extentInfo.IsDeleted {
674
			continue
675
		}
676
		extents = append(extents, extentInfo)
677
	}
678
	tinyDeleteFileSize, err = s.LoadTinyDeleteFileOffset()
679

680
	return
681
}
682

683
func (s *ExtentStore) getTinyExtentInfo() (extents []*ExtentInfo) {
684
	extents = make([]*ExtentInfo, 0)
685
	s.eiMutex.RLock()
686
	var extentID uint64
687
	for extentID = TinyExtentStartID; extentID < TinyExtentCount+TinyExtentStartID; extentID++ {
688
		ei := s.extentInfoMap[extentID]
689
		if ei == nil {
690
			continue
691
		}
692
		extents = append(extents, ei)
693
	}
694
	s.eiMutex.RUnlock()
695

696
	return
697
}
698

699
// ExtentID return the extent ID.
700
func (s *ExtentStore) ExtentID(filename string) (extentID uint64, isExtent bool) {
701
	if isExtent = RegexpExtentFile.MatchString(filename); !isExtent {
702
		return
703
	}
704
	var err error
705
	if extentID, err = strconv.ParseUint(filename, 10, 64); err != nil {
706
		isExtent = false
707
		return
708
	}
709
	isExtent = true
710
	return
711
}
712

713
func (s *ExtentStore) initTinyExtent() (err error) {
714
	s.availableTinyExtentC = make(chan uint64, TinyExtentCount)
715
	s.brokenTinyExtentC = make(chan uint64, TinyExtentCount)
716
	var extentID uint64
717

718
	for extentID = TinyExtentStartID; extentID < TinyExtentStartID+TinyExtentCount; extentID++ {
719
		err = s.Create(extentID)
720
		if err == nil || strings.Contains(err.Error(), syscall.EEXIST.Error()) || err == ExtentExistsError {
721
			err = nil
722
			s.brokenTinyExtentC <- extentID
723
			s.brokenTinyExtentMap.Store(extentID, true)
724
			continue
725
		}
726
		return err
727
	}
728

729
	return
730
}
731

732
// GetAvailableTinyExtent returns the available tiny extent from the channel.
733
func (s *ExtentStore) GetAvailableTinyExtent() (extentID uint64, err error) {
734
	select {
735
	case extentID = <-s.availableTinyExtentC:
736
		log.LogDebugf("dp %v GetAvailableTinyExtent. extentID %v", s.partitionID, extentID)
737
		s.availableTinyExtentMap.Delete(extentID)
738
		return
739
	default:
740
		log.LogDebugf("dp %v GetAvailableTinyExtent not found", s.partitionID)
741
		return 0, NoAvailableExtentError
742

743
	}
744
}
745

746
// SendToAvailableTinyExtentC sends the extent to the channel that stores the available tiny extents.
747
func (s *ExtentStore) SendToAvailableTinyExtentC(extentID uint64) {
748
	log.LogDebugf("dp %v action[SendToAvailableTinyExtentC] extentid %v", s.partitionID, extentID)
749
	if _, ok := s.availableTinyExtentMap.Load(extentID); !ok {
750
		log.LogDebugf("dp %v SendToAvailableTinyExtentC. extentID %v", s.partitionID, extentID)
751
		s.availableTinyExtentC <- extentID
752
		s.availableTinyExtentMap.Store(extentID, true)
753
	} else {
754
		log.LogDebugf("dp %v action[SendToAvailableTinyExtentC] extentid %v already exist", s.partitionID, extentID)
755
	}
756
}
757

758
// SendAllToBrokenTinyExtentC sends all the extents to the channel that stores the broken extents.
759
func (s *ExtentStore) SendAllToBrokenTinyExtentC(extentIds []uint64) {
760
	for _, extentID := range extentIds {
761
		if _, ok := s.brokenTinyExtentMap.Load(extentID); !ok {
762
			s.brokenTinyExtentC <- extentID
763
			s.brokenTinyExtentMap.Store(extentID, true)
764
		}
765
	}
766
}
767

768
// AvailableTinyExtentCnt returns the count of the available tiny extents.
769
func (s *ExtentStore) AvailableTinyExtentCnt() int {
770
	return len(s.availableTinyExtentC)
771
}
772

773
// BrokenTinyExtentCnt returns the count of the broken tiny extents.
774
func (s *ExtentStore) BrokenTinyExtentCnt() int {
775
	return len(s.brokenTinyExtentC)
776
}
777

778
// MoveAllToBrokenTinyExtentC moves all the tiny extents to the channel stores the broken extents.
779
func (s *ExtentStore) MoveAllToBrokenTinyExtentC(cnt int) {
780
	for i := 0; i < cnt; i++ {
781
		extentID, err := s.GetAvailableTinyExtent()
782
		if err != nil {
783
			return
784
		}
785
		s.SendToBrokenTinyExtentC(extentID)
786
	}
787
}
788

789
// SendToBrokenTinyExtentC sends the given extent id to the channel.
790
func (s *ExtentStore) SendToBrokenTinyExtentC(extentID uint64) {
791
	if _, ok := s.brokenTinyExtentMap.Load(extentID); !ok {
792
		s.brokenTinyExtentC <- extentID
793
		s.brokenTinyExtentMap.Store(extentID, true)
794
	}
795
}
796

797
// GetBrokenTinyExtent returns the first broken extent in the channel.
798
func (s *ExtentStore) GetBrokenTinyExtent() (extentID uint64, err error) {
799
	select {
800
	case extentID = <-s.brokenTinyExtentC:
801
		s.brokenTinyExtentMap.Delete(extentID)
802
		return
803
	default:
804
		return 0, NoBrokenExtentError
805

806
	}
807
}
808

809
// StoreSizeExtentID returns the size of the extent store
810
func (s *ExtentStore) StoreSizeExtentID(maxExtentID uint64) (totalSize uint64) {
811
	extentInfos := make([]*ExtentInfo, 0)
812
	s.eiMutex.RLock()
813
	for _, extentInfo := range s.extentInfoMap {
814
		if extentInfo.FileID <= maxExtentID {
815
			extentInfos = append(extentInfos, extentInfo)
816
		}
817
	}
818
	s.eiMutex.RUnlock()
819
	for _, extentInfo := range extentInfos {
820
		totalSize += extentInfo.TotalSize()
821
		log.LogDebugf("ExtentStore.StoreSizeExtentID dp %v extentInfo %v totalSize %v", s.partitionID, extentInfo, extentInfo.TotalSize())
822
	}
823

824
	return totalSize
825
}
826

827
// StoreSizeExtentID returns the size of the extent store
828
func (s *ExtentStore) GetMaxExtentIDAndPartitionSize() (maxExtentID, totalSize uint64) {
829
	extentInfos := make([]*ExtentInfo, 0)
830
	s.eiMutex.RLock()
831
	for _, extentInfo := range s.extentInfoMap {
832
		extentInfos = append(extentInfos, extentInfo)
833
	}
834
	s.eiMutex.RUnlock()
835
	for _, extentInfo := range extentInfos {
836
		if extentInfo.FileID > maxExtentID {
837
			maxExtentID = extentInfo.FileID
838
		}
839
		totalSize += extentInfo.TotalSize()
840
	}
841
	return maxExtentID, totalSize
842
}
843

844
func MarshalTinyExtent(extentID uint64, offset, size int64) (data []byte) {
845
	data = make([]byte, DeleteTinyRecordSize)
846
	binary.BigEndian.PutUint64(data[0:8], extentID)
847
	binary.BigEndian.PutUint64(data[8:16], uint64(offset))
848
	binary.BigEndian.PutUint64(data[16:DeleteTinyRecordSize], uint64(size))
849
	return data
850
}
851

852
func UnMarshalTinyExtent(data []byte) (extentID, offset, size uint64) {
853
	extentID = binary.BigEndian.Uint64(data[0:8])
854
	offset = binary.BigEndian.Uint64(data[8:16])
855
	size = binary.BigEndian.Uint64(data[16:DeleteTinyRecordSize])
856
	return
857
}
858

859
func (s *ExtentStore) RecordTinyDelete(extentID uint64, offset, size int64) (err error) {
860
	record := MarshalTinyExtent(extentID, offset, size)
861
	stat, err := s.tinyExtentDeleteFp.Stat()
862
	if err != nil {
863
		return
864
	}
865
	if stat.Size()%DeleteTinyRecordSize != 0 {
866
		needWriteEmpty := DeleteTinyRecordSize - (stat.Size() % DeleteTinyRecordSize)
867
		data := make([]byte, needWriteEmpty)
868
		s.tinyExtentDeleteFp.Write(data)
869
	}
870
	_, err = s.tinyExtentDeleteFp.Write(record)
871
	if err != nil {
872
		return
873
	}
874

875
	return
876
}
877

878
func (s *ExtentStore) ReadTinyDeleteRecords(offset, size int64, data []byte) (crc uint32, err error) {
879
	_, err = s.tinyExtentDeleteFp.ReadAt(data[:size], offset)
880
	if err == nil || err == io.EOF {
881
		err = nil
882
		crc = crc32.ChecksumIEEE(data[:size])
883
	}
884
	return
885
}
886

887
type ExtentDeleted struct {
888
	ExtentID uint64 `json:"extentID"`
889
	Offset   uint64 `json:"offset"`
890
	Size     uint64 `json:"size"`
891
}
892

893
func (s *ExtentStore) GetHasDeleteTinyRecords() (extentDes []ExtentDeleted, err error) {
894
	data := make([]byte, DeleteTinyRecordSize)
895
	offset := int64(0)
896

897
	for {
898
		_, err = s.tinyExtentDeleteFp.ReadAt(data, offset)
899
		if err != nil {
900
			if err == io.EOF {
901
				err = nil
902
			}
903
			return
904
		}
905

906
		extent := ExtentDeleted{}
907
		extent.ExtentID, extent.Offset, extent.Size = UnMarshalTinyExtent(data)
908
		extentDes = append(extentDes, extent)
909
		offset += DeleteTinyRecordSize
910
	}
911
}
912

913
// NextExtentID returns the next extentID. When the client sends the request to create an extent,
914
// this function generates an unique extentID within the current partition.
915
// This function can only be called by the leader.
916
func (s *ExtentStore) NextExtentID() (extentID uint64, err error) {
917
	extentID = atomic.AddUint64(&s.baseExtentID, 1)
918
	err = s.PersistenceBaseExtentID(extentID)
919
	return
920
}
921

922
func (s *ExtentStore) LoadTinyDeleteFileOffset() (offset int64, err error) {
923
	stat, err := s.tinyExtentDeleteFp.Stat()
924
	if err == nil {
925
		offset = stat.Size()
926
	}
927
	return
928
}
929

930
func (s *ExtentStore) getExtentKey(extent uint64) string {
931
	return fmt.Sprintf("extent %v_%v", s.partitionID, extent)
932
}
933

934
// UpdateBaseExtentID updates the base extent ID.
935
func (s *ExtentStore) UpdateBaseExtentID(id uint64) (err error) {
936
	if IsTinyExtent(id) {
937
		return
938
	}
939
	if id > atomic.LoadUint64(&s.baseExtentID) {
940
		atomic.StoreUint64(&s.baseExtentID, id)
941
		err = s.PersistenceBaseExtentID(atomic.LoadUint64(&s.baseExtentID))
942
	}
943
	s.PreAllocSpaceOnVerfiyFile(atomic.LoadUint64(&s.baseExtentID))
944

945
	return
946
}
947

948
func (s *ExtentStore) extent(extentID uint64) (e *Extent, err error) {
949
	if e, err = s.LoadExtentFromDisk(extentID, false); err != nil {
950
		err = fmt.Errorf("load extent from disk: %v", err)
951
		return nil, err
952
	}
953
	return
954
}
955

956
func (s *ExtentStore) extentWithHeader(ei *ExtentInfo) (e *Extent, err error) {
957
	var ok bool
958
	if ei == nil || ei.IsDeleted {
959
		err = ExtentNotFoundError
960
		return
961
	}
962
	if e, ok = s.cache.Get(ei.FileID); !ok {
963
		if e, err = s.LoadExtentFromDisk(ei.FileID, true); err != nil {
964
			err = fmt.Errorf("load  %v from disk: %v", s.getExtentKey(ei.FileID), err)
965
			return nil, err
966
		}
967
	}
968
	return
969
}
970

971
func (s *ExtentStore) extentWithHeaderByExtentID(extentID uint64) (e *Extent, err error) {
972
	var ok bool
973
	if e, ok = s.cache.Get(extentID); !ok {
974
		if e, err = s.LoadExtentFromDisk(extentID, true); err != nil {
975
			err = fmt.Errorf("load  %v from disk: %v", s.getExtentKey(extentID), err)
976
			return nil, err
977
		}
978
	}
979
	return
980
}
981

982
// HasExtent tells if the extent store has the extent with the given ID
983
func (s *ExtentStore) HasExtent(extentID uint64) (exist bool) {
984
	s.eiMutex.RLock()
985
	defer s.eiMutex.RUnlock()
986
	_, exist = s.extentInfoMap[extentID]
987
	return
988
}
989

990
// GetExtentCount returns the number of extents in the extentInfoMap
991
func (s *ExtentStore) GetExtentCount() (count int) {
992
	s.eiMutex.RLock()
993
	defer s.eiMutex.RUnlock()
994
	return len(s.extentInfoMap)
995
}
996

997
func (s *ExtentStore) LoadExtentFromDisk(extentID uint64, putCache bool) (e *Extent, err error) {
998
	name := path.Join(s.dataPath, fmt.Sprintf("%v", extentID))
999
	e = NewExtentInCore(name, extentID)
1000
	if err = e.RestoreFromFS(); err != nil {
1001
		err = fmt.Errorf("restore from file %v putCache %v system: %v", name, putCache, err)
1002
		return
1003
	}
1004

1005
	if !putCache {
1006
		return
1007
	}
1008

1009
	if !IsTinyExtent(extentID) && proto.IsNormalDp(s.partitionType) {
1010
		e.header = make([]byte, util.BlockHeaderSize)
1011
		if _, err = s.verifyExtentFp.ReadAt(e.header, int64(extentID*util.BlockHeaderSize)); err != nil && err != io.EOF {
1012
			return
1013
		}
1014
		emptyHeader := make([]byte, util.BlockHeaderSize)
1015
		log.LogDebugf("LoadExtentFromDisk. partition id %v extentId %v, snapshotOff %v, append fp cnt %v",
1016
			s.partitionID, extentID, e.snapshotDataOff, len(s.verifyExtentFpAppend))
1017
		if e.snapshotDataOff > util.ExtentSize {
1018
			for id, vFp := range s.verifyExtentFpAppend {
1019
				if uint64(id) > (e.snapshotDataOff-util.ExtentSize)/util.ExtentSize {
1020
					log.LogDebugf("LoadExtentFromDisk. partition id %v extentId %v, snapshotOff %v id %v out of extent range",
1021
						s.partitionID, extentID, e.snapshotDataOff, id)
1022
					break
1023
				}
1024
				log.LogDebugf("LoadExtentFromDisk. partition id %v extentId %v, snapshotOff %v id %v", s.partitionID, extentID, e.snapshotDataOff, id)
1025
				header := make([]byte, util.BlockHeaderSize)
1026
				if _, err = vFp.ReadAt(header, int64(extentID*util.BlockHeaderSize)); err != nil && err != io.EOF {
1027
					log.LogDebugf("LoadExtentFromDisk. partition id %v extentId %v, read at %v err %v",
1028
						s.partitionID, extentID, extentID*util.BlockHeaderSize, err)
1029
					return
1030
				}
1031
				if bytes.Equal(emptyHeader, header) {
1032
					log.LogErrorf("LoadExtentFromDisk. partition id %v extent %v hole at id %v", s.partitionID, e, id)
1033
				}
1034
				e.header = append(e.header, header...)
1035
			}
1036
			if len(s.verifyExtentFpAppend) < int(e.snapshotDataOff-1)/util.ExtentSize {
1037
				log.LogErrorf("LoadExtentFromDisk. extent %v need fp %v out of range %v", e, int(e.snapshotDataOff-1)/util.ExtentSize, len(s.verifyExtentFpAppend))
1038
			}
1039
		}
1040
	}
1041

1042
	err = nil
1043
	s.cache.Put(e)
1044

1045
	return
1046
}
1047

1048
func (s *ExtentStore) ScanBlocks(extentID uint64) (bcs []*BlockCrc, err error) {
1049
	if !proto.IsNormalDp(s.partitionType) {
1050
		return
1051
	}
1052

1053
	var blockCnt int
1054
	bcs = make([]*BlockCrc, 0)
1055
	ei := s.extentInfoMap[extentID]
1056
	e, err := s.extentWithHeader(ei)
1057
	if err != nil {
1058
		return bcs, err
1059
	}
1060

1061
	extSize := e.Size()
1062
	if e.snapshotDataOff > util.ExtentSize {
1063
		extSize = int64(e.snapshotDataOff)
1064
	}
1065
	blockCnt = int(extSize / util.BlockSize)
1066

1067
	if e.Size()%util.BlockSize != 0 {
1068
		blockCnt += 1
1069
	}
1070
	for blockNo := 0; blockNo < blockCnt; blockNo++ {
1071
		blockCrc := binary.BigEndian.Uint32(e.header[blockNo*util.PerBlockCrcSize : (blockNo+1)*util.PerBlockCrcSize])
1072
		bcs = append(bcs, &BlockCrc{BlockNo: blockNo, Crc: blockCrc})
1073
	}
1074
	sort.Sort(BlockCrcArr(bcs))
1075

1076
	return
1077
}
1078

1079
type ExtentInfoArr []*ExtentInfo
1080

1081
func (arr ExtentInfoArr) Len() int           { return len(arr) }
1082
func (arr ExtentInfoArr) Less(i, j int) bool { return arr[i].FileID < arr[j].FileID }
1083
func (arr ExtentInfoArr) Swap(i, j int)      { arr[i], arr[j] = arr[j], arr[i] }
1084

1085
func (s *ExtentStore) BackendTask() {
1086
	s.autoComputeExtentCrc()
1087
	s.cleanExpiredNormalExtentDeleteCache()
1088
}
1089

1090
func (s *ExtentStore) cleanExpiredNormalExtentDeleteCache() {
1091
	s.hasDeleteNormalExtentsCache.Range(func(key, value interface{}) bool {
1092
		deleteTime := value.(int64)
1093
		extentID := key.(uint64)
1094
		if time.Now().Unix()-deleteTime > NormalExtentDeleteRetainTime {
1095
			s.hasDeleteNormalExtentsCache.Delete(extentID)
1096
		}
1097
		return true
1098
	})
1099
}
1100

1101
func (s *ExtentStore) autoComputeExtentCrc() {
1102
	if !proto.IsNormalDp(s.partitionType) {
1103
		return
1104
	}
1105

1106
	defer func() {
1107
		if r := recover(); r != nil {
1108
			return
1109
		}
1110
	}()
1111

1112
	extentInfos := make([]*ExtentInfo, 0)
1113
	deleteExtents := make([]*ExtentInfo, 0)
1114
	s.eiMutex.RLock()
1115
	for _, ei := range s.extentInfoMap {
1116
		extentInfos = append(extentInfos, ei)
1117
		if ei.IsDeleted && time.Now().Unix()-ei.ModifyTime > UpdateCrcInterval {
1118
			deleteExtents = append(deleteExtents, ei)
1119
		}
1120
	}
1121
	s.eiMutex.RUnlock()
1122

1123
	if len(deleteExtents) > 0 {
1124
		s.eiMutex.Lock()
1125
		for _, ei := range deleteExtents {
1126
			delete(s.extentInfoMap, ei.FileID)
1127
		}
1128
		s.eiMutex.Unlock()
1129
	}
1130

1131
	sort.Sort(ExtentInfoArr(extentInfos))
1132

1133
	for _, ei := range extentInfos {
1134
		s.ApplyIdMutex.RLock()
1135
		if ei == nil {
1136
			s.ApplyIdMutex.RUnlock()
1137
			continue
1138
		}
1139

1140
		if !IsTinyExtent(ei.FileID) && time.Now().Unix()-ei.ModifyTime > UpdateCrcInterval &&
1141
			!ei.IsDeleted && ei.Size > 0 && ei.Crc == 0 {
1142

1143
			e, err := s.extentWithHeader(ei)
1144
			if err != nil {
1145
				log.LogError("[autoComputeExtentCrc] get extent error", err)
1146
				s.ApplyIdMutex.RUnlock()
1147
				continue
1148
			}
1149

1150
			extentCrc, err := e.autoComputeExtentCrc(s.PersistenceBlockCrc)
1151
			if err != nil {
1152
				log.LogError("[autoComputeExtentCrc] compute crc fail", err)
1153
				s.ApplyIdMutex.RUnlock()
1154
				continue
1155
			}
1156

1157
			ei.UpdateExtentInfo(e, extentCrc)
1158
			ei.ApplyID = s.ApplyId
1159
			time.Sleep(time.Millisecond * 100)
1160
		}
1161
		s.ApplyIdMutex.RUnlock()
1162
	}
1163

1164
	time.Sleep(time.Second)
1165
}
1166

1167
func (s *ExtentStore) TinyExtentRecover(extentID uint64, offset, size int64, data []byte, crc uint32, isEmptyPacket bool) (err error) {
1168
	if !IsTinyExtent(extentID) {
1169
		return fmt.Errorf("extent %v not tinyExtent", extentID)
1170
	}
1171

1172
	var (
1173
		e  *Extent
1174
		ei *ExtentInfo
1175
	)
1176

1177
	s.eiMutex.RLock()
1178
	ei = s.extentInfoMap[extentID]
1179
	s.eiMutex.RUnlock()
1180
	if e, err = s.extentWithHeader(ei); err != nil {
1181
		return nil
1182
	}
1183

1184
	if err = e.TinyExtentRecover(data, offset, size, crc, isEmptyPacket); err != nil {
1185
		return err
1186
	}
1187
	ei.UpdateExtentInfo(e, 0)
1188

1189
	return nil
1190
}
1191

1192
func (s *ExtentStore) TinyExtentGetFinfoSize(extentID uint64) (size uint64, err error) {
1193
	var e *Extent
1194
	if !IsTinyExtent(extentID) {
1195
		return 0, fmt.Errorf("unavali extent id (%v)", extentID)
1196
	}
1197
	s.eiMutex.RLock()
1198
	ei := s.extentInfoMap[extentID]
1199
	s.eiMutex.RUnlock()
1200
	if e, err = s.extentWithHeader(ei); err != nil {
1201
		return
1202
	}
1203

1204
	finfo, err := e.file.Stat()
1205
	if err != nil {
1206
		return 0, err
1207
	}
1208
	size = uint64(finfo.Size())
1209

1210
	return
1211
}
1212

1213
func (s *ExtentStore) TinyExtentAvaliOffset(extentID uint64, offset int64) (newOffset, newEnd int64, err error) {
1214
	var e *Extent
1215
	if !IsTinyExtent(extentID) {
1216
		return 0, 0, fmt.Errorf("unavali extent(%v)", extentID)
1217
	}
1218
	s.eiMutex.RLock()
1219
	ei := s.extentInfoMap[extentID]
1220
	s.eiMutex.RUnlock()
1221
	if e, err = s.extentWithHeader(ei); err != nil {
1222
		return
1223
	}
1224

1225
	defer func() {
1226
		if err != nil && strings.Contains(err.Error(), syscall.ENXIO.Error()) {
1227
			newOffset = e.dataSize
1228
			newEnd = e.dataSize
1229
			err = nil
1230
		}
1231
	}()
1232
	newOffset, newEnd, err = e.tinyExtentAvaliOffset(offset)
1233

1234
	return
1235
}
1236

1237
func (s *ExtentStore) renameStaleExtentStore() (err error) {
1238
	// create: move current folder to .old and create a new folder
1239
	if _, err = os.Stat(s.dataPath); err != nil {
1240
		if os.IsNotExist(err) {
1241
			return nil
1242
		}
1243
	}
1244

1245
	curTime := time.Now().Format(StaleExtStoreTimeFormat)
1246
	staleExtStoreDirName := s.dataPath + "_" + curTime + StaleExtStoreBackupSuffix
1247

1248
	if err = os.Rename(s.dataPath, staleExtStoreDirName); err != nil {
1249
		return
1250
	}
1251
	return
1252
}
1253

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.