1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
30
"github.com/cubefs/cubefs/proto"
31
"github.com/cubefs/cubefs/util/errors"
32
"github.com/cubefs/cubefs/util/log"
33
mmap "github.com/edsrzf/mmap-go"
37
snapshotDir = "snapshot"
38
snapshotDirTmp = ".snapshot"
39
snapshotBackup = ".snapshot_backup"
43
multipartFile = "multipart"
44
txInfoFile = "tx_info"
45
txRbInodeFile = "tx_rb_inode"
46
txRbDentryFile = "tx_rb_dentry"
48
TxIDFile = "transactionID"
49
SnapshotSign = ".sign"
51
metadataFileTmp = ".meta"
53
uniqCheckerFile = "uniqChecker"
54
verdataFile = "multiVer"
55
StaleMetadataSuffix = ".old"
56
StaleMetadataTimeFormat = "20060102150405.000000000"
57
verdataInitFile = "multiVerInitFile"
60
func (mp *metaPartition) loadMetadata() (err error) {
61
metaFile := path.Join(mp.config.RootDir, metadataFile)
62
fp, err := os.OpenFile(metaFile, os.O_RDONLY, 0o644)
64
err = errors.NewErrorf("[loadMetadata]: OpenFile %s", err.Error())
68
data, err := io.ReadAll(fp)
69
if err != nil || len(data) == 0 {
70
err = errors.NewErrorf("[loadMetadata]: ReadFile %s, data: %s", err.Error(),
74
mConf := &MetaPartitionConfig{}
75
if err = json.Unmarshal(data, mConf); err != nil {
76
err = errors.NewErrorf("[loadMetadata]: Unmarshal MetaPartitionConfig %s",
81
if mConf.checkMeta() != nil {
84
mp.config.PartitionId = mConf.PartitionId
85
mp.config.VolName = mConf.VolName
86
mp.config.Start = mConf.Start
87
mp.config.End = mConf.End
88
mp.config.Peers = mConf.Peers
89
mp.config.Cursor = mp.config.Start
92
mp.uidManager = NewUidMgr(mp.config.VolName, mp.config.PartitionId)
93
mp.mqMgr = NewQuotaManager(mp.config.VolName, mp.config.PartitionId)
95
log.LogInfof("loadMetadata: load complete: partitionID(%v) volume(%v) range(%v,%v) cursor(%v)",
96
mp.config.PartitionId, mp.config.VolName, mp.config.Start, mp.config.End, mp.config.Cursor)
100
func (mp *metaPartition) loadInode(rootDir string, crc uint32) (err error) {
104
log.LogInfof("loadInode: load complete: partitonID(%v) volume(%v) numInodes(%v)",
105
mp.config.PartitionId, mp.config.VolName, numInodes)
108
filename := path.Join(rootDir, inodeFile)
109
if _, err = os.Stat(filename); err != nil {
110
err = errors.NewErrorf("[loadInode] Stat: %s", err.Error())
113
fp, err := os.OpenFile(filename, os.O_RDONLY, 0o644)
115
err = errors.NewErrorf("[loadInode] OpenFile: %s", err.Error())
119
reader := bufio.NewReaderSize(fp, 4*1024*1024)
120
inoBuf := make([]byte, 4)
121
crcCheck := crc32.NewIEEE()
125
_, err = io.ReadFull(reader, inoBuf)
129
if res := crcCheck.Sum32(); res != crc {
130
log.LogErrorf("[loadInode]: check crc mismatch, expected[%d], actual[%d]", crc, res)
131
return ErrSnapshotCrcMismatch
135
err = errors.NewErrorf("[loadInode] ReadHeader: %s", err.Error())
139
if _, err = crcCheck.Write(inoBuf); err != nil {
143
length := binary.BigEndian.Uint32(inoBuf)
146
if uint32(cap(inoBuf)) >= length {
147
inoBuf = inoBuf[:length]
149
inoBuf = make([]byte, length)
151
_, err = io.ReadFull(reader, inoBuf)
153
err = errors.NewErrorf("[loadInode] ReadBody: %s", err.Error())
156
ino := NewInode(0, 0)
157
if err = ino.Unmarshal(inoBuf); err != nil {
158
err = errors.NewErrorf("[loadInode] Unmarshal: %s", err.Error())
161
mp.acucumUidSizeByLoad(ino)
163
if _, err = crcCheck.Write(inoBuf); err != nil {
169
mp.fsmCreateInode(ino)
170
mp.checkAndInsertFreeList(ino)
171
if mp.config.Cursor < ino.Inode {
172
mp.config.Cursor = ino.Inode
178
// Load dentry from the dentry snapshot.
179
func (mp *metaPartition) loadDentry(rootDir string, crc uint32) (err error) {
180
var numDentries uint64
183
log.LogInfof("loadDentry: load complete: partitonID(%v) volume(%v) numDentries(%v)",
184
mp.config.PartitionId, mp.config.VolName, numDentries)
187
filename := path.Join(rootDir, dentryFile)
188
if _, err = os.Stat(filename); err != nil {
189
err = errors.NewErrorf("[loadDentry] Stat: %s", err.Error())
192
fp, err := os.OpenFile(filename, os.O_RDONLY, 0o644)
194
err = errors.NewErrorf("[loadDentry] OpenFile: %s", err.Error())
199
reader := bufio.NewReaderSize(fp, 4*1024*1024)
200
dentryBuf := make([]byte, 4)
201
crcCheck := crc32.NewIEEE()
203
dentryBuf = dentryBuf[:4]
204
// First Read 4byte header length
205
_, err = io.ReadFull(reader, dentryBuf)
209
if res := crcCheck.Sum32(); res != crc {
210
log.LogErrorf("[loadDentry]: check crc mismatch, expected[%d], actual[%d]", crc, res)
211
return ErrSnapshotCrcMismatch
215
err = errors.NewErrorf("[loadDentry] ReadHeader: %s", err.Error())
218
if _, err = crcCheck.Write(dentryBuf); err != nil {
221
length := binary.BigEndian.Uint32(dentryBuf)
224
if uint32(cap(dentryBuf)) >= length {
225
dentryBuf = dentryBuf[:length]
227
dentryBuf = make([]byte, length)
229
_, err = io.ReadFull(reader, dentryBuf)
231
err = errors.NewErrorf("[loadDentry]: ReadBody: %s", err.Error())
235
if err = dentry.Unmarshal(dentryBuf); err != nil {
236
err = errors.NewErrorf("[loadDentry] Unmarshal: %s", err.Error())
239
if status := mp.fsmCreateDentry(dentry, true); status != proto.OpOk {
240
err = errors.NewErrorf("[loadDentry] createDentry dentry: %v, resp code: %d", dentry, status)
243
if _, err = crcCheck.Write(dentryBuf); err != nil {
250
func (mp *metaPartition) loadExtend(rootDir string, crc uint32) (err error) {
251
filename := path.Join(rootDir, extendFile)
252
if _, err = os.Stat(filename); err != nil {
253
err = errors.NewErrorf("[loadExtend] Stat: %s", err.Error())
256
fp, err := os.OpenFile(filename, os.O_RDONLY, 0o644)
258
err = errors.NewErrorf("[loadExtend] OpenFile: %s", err.Error())
265
if mem, err = mmap.Map(fp, mmap.RDONLY, 0); err != nil {
272
// read number of extends
273
var numExtends uint64
274
numExtends, n = binary.Uvarint(mem)
277
varintTmp := make([]byte, binary.MaxVarintLen64)
278
// write number of extends
279
n = binary.PutUvarint(varintTmp, numExtends)
281
crcCheck := crc32.NewIEEE()
282
if _, err = crcCheck.Write(varintTmp[:n]); err != nil {
285
for i := uint64(0); i < numExtends; i++ {
288
numBytes, n = binary.Uvarint(mem[offset:])
291
if extend, err = NewExtendFromBytes(mem[offset : offset+int(numBytes)]); err != nil {
295
if _, err = crcCheck.Write(mem[offset-n : offset]); err != nil {
298
// log.LogDebugf("loadExtend: new extend from bytes: partitionID (%v) volume(%v) inode[%v]",
299
// mp.config.PartitionId, mp.config.VolName, extend.inode)
300
_ = mp.fsmSetXAttr(extend)
302
if _, err = crcCheck.Write(mem[offset : offset+int(numBytes)]); err != nil {
305
offset += int(numBytes)
306
mp.statisticExtendByLoad(extend)
309
log.LogInfof("loadExtend: load complete: partitionID(%v) volume(%v) numExtends(%v) filename(%v)",
310
mp.config.PartitionId, mp.config.VolName, numExtends, filename)
311
if res := crcCheck.Sum32(); res != crc {
312
log.LogErrorf("loadExtend: check crc mismatch, expected[%d], actual[%d]", crc, res)
313
return ErrSnapshotCrcMismatch
318
func (mp *metaPartition) loadMultipart(rootDir string, crc uint32) (err error) {
319
filename := path.Join(rootDir, multipartFile)
320
if _, err = os.Stat(filename); err != nil {
321
err = errors.NewErrorf("[loadMultipart] Stat: %s", err.Error())
324
fp, err := os.OpenFile(filename, os.O_RDONLY, 0o644)
326
err = errors.NewErrorf("[loadMultipart] OpenFile: %s", err.Error())
333
if mem, err = mmap.Map(fp, mmap.RDONLY, 0); err != nil {
340
// read number of multipart
341
var numMultiparts uint64
342
numMultiparts, n = binary.Uvarint(mem)
343
varintTmp := make([]byte, binary.MaxVarintLen64)
344
// write number of multipart
345
n = binary.PutUvarint(varintTmp, numMultiparts)
346
crcCheck := crc32.NewIEEE()
347
if _, err = crcCheck.Write(varintTmp[:n]); err != nil {
351
for i := uint64(0); i < numMultiparts; i++ {
354
numBytes, n = binary.Uvarint(mem[offset:])
356
if _, err = crcCheck.Write(mem[offset-n : offset]); err != nil {
359
var multipart *Multipart
360
multipart = MultipartFromBytes(mem[offset : offset+int(numBytes)])
361
log.LogDebugf("loadMultipart: create multipart from bytes: partitionID(%v) multipartID(%v)", mp.config.PartitionId, multipart.id)
362
mp.fsmCreateMultipart(multipart)
363
offset += int(numBytes)
364
if _, err = crcCheck.Write(mem[offset-int(numBytes) : offset]); err != nil {
368
log.LogInfof("loadMultipart: load complete: partitionID(%v) numMultiparts(%v) filename(%v)",
369
mp.config.PartitionId, numMultiparts, filename)
370
if res := crcCheck.Sum32(); res != crc {
371
log.LogErrorf("[loadMultipart] check crc mismatch, expected[%d], actual[%d]", crc, res)
372
return ErrSnapshotCrcMismatch
377
func (mp *metaPartition) loadApplyID(rootDir string) (err error) {
378
filename := path.Join(rootDir, applyIDFile)
379
if _, err = os.Stat(filename); err != nil {
380
err = errors.NewErrorf("[loadApplyID]: Stat %s", err.Error())
383
data, err := os.ReadFile(filename)
385
err = errors.NewErrorf("[loadApplyID] ReadFile: %s", err.Error())
389
err = errors.NewErrorf("[loadApplyID]: ApplyID is empty")
393
if strings.Contains(string(data), "|") {
394
_, err = fmt.Sscanf(string(data), "%d|%d", &mp.applyID, &cursor)
396
_, err = fmt.Sscanf(string(data), "%d", &mp.applyID)
399
err = errors.NewErrorf("[loadApplyID] ReadApplyID: %s", err.Error())
403
mp.storedApplyId = mp.applyID
405
if cursor > mp.GetCursor() {
406
atomic.StoreUint64(&mp.config.Cursor, cursor)
409
log.LogInfof("loadApplyID: load complete: partitionID(%v) volume(%v) applyID(%v) cursor(%v) filename(%v)",
410
mp.config.PartitionId, mp.config.VolName, mp.applyID, mp.config.Cursor, filename)
414
func (mp *metaPartition) loadTxRbDentry(rootDir string, crc uint32) (err error) {
415
var numTxRbDentry uint64
418
log.LogInfof("loadTxRbDentry: load complete: partitonID(%v) volume(%v) numInodes(%v)",
419
mp.config.PartitionId, mp.config.VolName, numTxRbDentry)
422
filename := path.Join(rootDir, txRbDentryFile)
423
if _, err = os.Stat(filename); err != nil {
424
err = errors.NewErrorf("[loadTxRbDentry] Stat: %s", err.Error())
427
fp, err := os.OpenFile(filename, os.O_RDONLY, 0o644)
429
err = errors.NewErrorf("[loadTxRbDentry] OpenFile: %s", err.Error())
433
reader := bufio.NewReaderSize(fp, 4*1024*1024)
434
txBuf := make([]byte, 4)
435
crcCheck := crc32.NewIEEE()
440
_, err = io.ReadFull(reader, txBuf)
444
if res := crcCheck.Sum32(); res != crc {
445
log.LogErrorf("[loadTxRbDentry]: check crc mismatch, expected[%d], actual[%d]", crc, res)
446
return ErrSnapshotCrcMismatch
450
err = errors.NewErrorf("[loadTxRbDentry] ReadHeader: %s", err.Error())
454
if _, err = crcCheck.Write(txBuf); err != nil {
458
length := binary.BigEndian.Uint32(txBuf)
461
if uint32(cap(txBuf)) >= length {
462
txBuf = txBuf[:length]
464
txBuf = make([]byte, length)
466
_, err = io.ReadFull(reader, txBuf)
468
err = errors.NewErrorf("[loadTxRbDentry] ReadBody: %s", err.Error())
472
txRbDentry := NewTxRollbackDentry(nil, nil, 0)
473
if err = txRbDentry.Unmarshal(txBuf); err != nil {
474
err = errors.NewErrorf("[loadTxRbDentry] Unmarshal: %s", err.Error())
479
if _, err = crcCheck.Write(txBuf); err != nil {
483
// mp.txProcessor.txResource.txRollbackDentries[txRbDentry.txDentryInfo.GetKey()] = txRbDentry
484
mp.txProcessor.txResource.txRbDentryTree.ReplaceOrInsert(txRbDentry, true)
489
func (mp *metaPartition) loadTxRbInode(rootDir string, crc uint32) (err error) {
490
var numTxRbInode uint64
493
log.LogInfof("loadTxRbInode: load complete: partitonID(%v) volume(%v) numInodes(%v)",
494
mp.config.PartitionId, mp.config.VolName, numTxRbInode)
497
filename := path.Join(rootDir, txRbInodeFile)
498
if _, err = os.Stat(filename); err != nil {
499
err = errors.NewErrorf("[loadTxRbInode] Stat: %s", err.Error())
502
fp, err := os.OpenFile(filename, os.O_RDONLY, 0o644)
504
err = errors.NewErrorf("[loadTxRbInode] OpenFile: %s", err.Error())
508
reader := bufio.NewReaderSize(fp, 4*1024*1024)
509
txBuf := make([]byte, 4)
510
crcCheck := crc32.NewIEEE()
515
_, err = io.ReadFull(reader, txBuf)
521
err = errors.NewErrorf("[loadTxRbInode] ReadHeader: %s", err.Error())
525
if _, err = crcCheck.Write(txBuf); err != nil {
529
length := binary.BigEndian.Uint32(txBuf)
532
if uint32(cap(txBuf)) >= length {
533
txBuf = txBuf[:length]
535
txBuf = make([]byte, length)
537
_, err = io.ReadFull(reader, txBuf)
539
err = errors.NewErrorf("[loadTxRbInode] ReadBody: %s", err.Error())
543
txRbInode := NewTxRollbackInode(nil, []uint32{}, nil, 0)
544
if err = txRbInode.Unmarshal(txBuf); err != nil {
545
err = errors.NewErrorf("[loadTxRbInode] Unmarshal: %s", err.Error())
549
if _, err = crcCheck.Write(txBuf); err != nil {
553
mp.txProcessor.txResource.txRbInodeTree.ReplaceOrInsert(txRbInode, true)
558
func (mp *metaPartition) loadTxInfo(rootDir string, crc uint32) (err error) {
559
var numTxInfos uint64
562
log.LogInfof("loadTxInfo: load complete: partitonID(%v) volume(%v) numInodes(%v)",
563
mp.config.PartitionId, mp.config.VolName, numTxInfos)
566
filename := path.Join(rootDir, txInfoFile)
567
if _, err = os.Stat(filename); err != nil {
568
err = errors.NewErrorf("[loadTxInfo] Stat: %s", err.Error())
571
fp, err := os.OpenFile(filename, os.O_RDONLY, 0o644)
573
err = errors.NewErrorf("[loadTxInfo] OpenFile: %s", err.Error())
577
reader := bufio.NewReaderSize(fp, 4*1024*1024)
578
txBuf := make([]byte, 4)
579
crcCheck := crc32.NewIEEE()
584
_, err = io.ReadFull(reader, txBuf)
588
if res := crcCheck.Sum32(); res != crc {
589
log.LogErrorf("[loadTxInfo]: check crc mismatch, expected[%d], actual[%d]", crc, res)
590
return ErrSnapshotCrcMismatch
594
err = errors.NewErrorf("[loadTxInfo] ReadHeader: %s", err.Error())
598
if _, err = crcCheck.Write(txBuf); err != nil {
602
length := binary.BigEndian.Uint32(txBuf)
605
if uint32(cap(txBuf)) >= length {
606
txBuf = txBuf[:length]
608
txBuf = make([]byte, length)
610
_, err = io.ReadFull(reader, txBuf)
612
err = errors.NewErrorf("[loadTxInfo] ReadBody: %s", err.Error())
616
txInfo := proto.NewTransactionInfo(0, proto.TxTypeUndefined)
617
if err = txInfo.Unmarshal(txBuf); err != nil {
618
err = errors.NewErrorf("[loadTxInfo] Unmarshal: %s", err.Error())
623
if _, err = crcCheck.Write(txBuf); err != nil {
627
mp.txProcessor.txManager.addTxInfo(txInfo)
632
func (mp *metaPartition) loadTxID(rootDir string) (err error) {
633
filename := path.Join(rootDir, TxIDFile)
634
if _, err = os.Stat(filename); err != nil {
638
data, err := os.ReadFile(filename)
640
err = errors.NewErrorf("[loadTxID] OpenFile: %s", err.Error())
644
err = errors.NewErrorf("[loadTxID]: TxID is empty")
648
_, err = fmt.Sscanf(string(data), "%d", &txId)
650
err = errors.NewErrorf("[loadTxID] ReadTxID: %s", err.Error())
654
if txId > mp.txProcessor.txManager.txIdAlloc.getTransactionID() {
655
mp.txProcessor.txManager.txIdAlloc.setTransactionID(txId)
657
log.LogInfof("loadTxID: load complete: partitionID(%v) volume(%v) txId(%v) filename(%v)",
658
mp.config.PartitionId, mp.config.VolName, mp.txProcessor.txManager.txIdAlloc.getTransactionID(), filename)
662
func (mp *metaPartition) loadUniqID(rootDir string) (err error) {
663
filename := path.Join(rootDir, uniqIDFile)
664
if _, err = os.Stat(filename); err != nil {
668
data, err := os.ReadFile(filename)
670
err = errors.NewErrorf("[loadUniqID] OpenFile: %s", err.Error())
674
err = errors.NewErrorf("[loadUniqID]: uniqID is empty")
678
_, err = fmt.Sscanf(string(data), "%d", &uniqId)
680
err = errors.NewErrorf("[loadUniqID] Read uniqID: %s", err.Error())
684
if uniqId > mp.GetUniqId() {
685
atomic.StoreUint64(&mp.config.UniqId, uniqId)
688
log.LogInfof("loadUniqID: load complete: partitionID(%v) volume(%v) uniqID(%v) filename(%v)",
689
mp.config.PartitionId, mp.config.VolName, mp.GetUniqId(), filename)
693
func (mp *metaPartition) loadUniqChecker(rootDir string, crc uint32) (err error) {
694
log.LogInfof("loadUniqChecker partition(%v) begin", mp.config.PartitionId)
695
filename := path.Join(rootDir, uniqCheckerFile)
696
if _, err = os.Stat(filename); err != nil {
697
log.LogErrorf("loadUniqChecker get file %s err(%s)", filename, err)
702
data, err := os.ReadFile(filename)
704
log.LogErrorf("loadUniqChecker read file %s err(%s)", filename, err)
705
err = errors.NewErrorf("[loadUniqChecker] OpenFile: %v", err.Error())
708
if err = mp.uniqChecker.UnMarshal(data); err != nil {
709
log.LogErrorf("loadUniqChecker UnMarshal err(%s)", err)
710
err = errors.NewErrorf("[loadUniqChecker] Unmarshal: %v", err.Error())
714
crcCheck := crc32.NewIEEE()
715
if _, err = crcCheck.Write(data); err != nil {
716
log.LogErrorf("loadUniqChecker write to crcCheck failed: %s", err)
719
if res := crcCheck.Sum32(); res != crc {
720
log.LogErrorf("[loadUniqChecker]: check crc mismatch, expected[%d], actual[%d]", crc, res)
721
return ErrSnapshotCrcMismatch
724
log.LogInfof("loadUniqChecker partition(%v) complete", mp.config.PartitionId)
728
func (mp *metaPartition) loadMultiVer(rootDir string, crc uint32) (err error) {
729
filename := path.Join(rootDir, verdataFile)
730
if _, err = os.Stat(filename); err != nil {
736
data, err := os.ReadFile(filename)
738
if err == os.ErrNotExist {
743
err = errors.NewErrorf("[loadMultiVer] OpenFile: %s", err.Error())
748
err = errors.NewErrorf("[loadMultiVer]: ApplyID is empty")
756
if strings.Contains(string(data), "|") {
757
_, err = fmt.Sscanf(string(data), "%d|%s", &applyId, &verData)
759
_, err = fmt.Sscanf(string(data), "%d", &applyId)
763
err = errors.NewErrorf("[loadMultiVer] ReadVerList: %s", err.Error())
767
var verList []*proto.VolVersionInfo
768
if err = json.Unmarshal([]byte(verData), &verList); err != nil {
769
err = errors.NewErrorf("[loadMultiVer] ReadVerList: %s verData(%v) applyId %v", verList, verData, applyId)
774
if byteData, err = json.Marshal(verList); err != nil {
777
sign := crc32.NewIEEE()
778
if _, err = sign.Write(byteData); err != nil {
782
if crc != sign.Sum32() {
783
return fmt.Errorf("partitionID(%v) volume(%v) calc crc %v not equal with disk %v", mp.config.PartitionId, mp.config.VolName, sign.Sum32(), crc)
786
mp.multiVersionList.VerList = verList
787
mp.verSeq = mp.multiVersionList.GetLastVer()
789
log.LogInfof("loadMultiVer: updateVerList load complete: partitionID(%v) volume(%v) applyID(%v) filename(%v) verlist (%v) crc (%v) mp Ver(%v)",
790
mp.config.PartitionId, mp.config.VolName, mp.applyID, filename, mp.multiVersionList.VerList, crc, mp.verSeq)
794
func (mp *metaPartition) storeMultiVersion(rootDir string, sm *storeMsg) (crc uint32, err error) {
795
filename := path.Join(rootDir, verdataFile)
796
fp, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_TRUNC|os.
806
if verData, err = json.Marshal(sm.multiVerList); err != nil {
809
sign := crc32.NewIEEE()
810
if _, err = sign.Write(verData); err != nil {
815
if _, err = fp.WriteString(fmt.Sprintf("%d|%s", sm.applyIndex, string(verData))); err != nil {
818
log.LogInfof("storeMultiVersion: store complete: partitionID(%v) volume(%v) applyID(%v) verData(%v) crc(%v)",
819
mp.config.PartitionId, mp.config.VolName, sm.applyIndex, string(verData), crc)
823
func (mp *metaPartition) renameStaleMetadata() (err error) {
824
if _, err = os.Stat(mp.config.RootDir); err != nil {
825
if os.IsNotExist(err) {
830
curTime := time.Now().Format(StaleMetadataTimeFormat)
831
staleMetaDirName := mp.config.RootDir + "_" + curTime + StaleMetadataSuffix
832
if err = os.Rename(mp.config.RootDir, staleMetaDirName); err != nil {
838
func (mp *metaPartition) persistMetadata() (err error) {
839
if err = mp.config.checkMeta(); err != nil {
840
err = errors.NewErrorf("[persistMetadata]->%s", err.Error())
844
// TODO Unhandled errors
845
os.MkdirAll(mp.config.RootDir, 0o755)
846
filename := path.Join(mp.config.RootDir, metadataFileTmp)
847
fp, err := os.OpenFile(filename, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.O_CREATE, 0o755)
852
// TODO Unhandled errors
858
data, err := json.Marshal(mp.config)
862
if _, err = fp.Write(data); err != nil {
865
if err = os.Rename(filename, path.Join(mp.config.RootDir, metadataFile)); err != nil {
868
log.LogInfof("persistMetata: persist complete: partitionID(%v) volume(%v) range(%v,%v) cursor(%v)",
869
mp.config.PartitionId, mp.config.VolName, mp.config.Start, mp.config.End, mp.config.Cursor)
873
func (mp *metaPartition) storeApplyID(rootDir string, sm *storeMsg) (err error) {
874
filename := path.Join(rootDir, applyIDFile)
875
fp, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_TRUNC|os.
885
cursor := mp.GetCursor()
886
if _, err = fp.WriteString(fmt.Sprintf("%d|%d", sm.applyIndex, cursor)); err != nil {
889
log.LogWarnf("storeApplyID: store complete: partitionID(%v) volume(%v) applyID(%v) cursor(%v)",
890
mp.config.PartitionId, mp.config.VolName, sm.applyIndex, cursor)
894
func (mp *metaPartition) storeTxID(rootDir string, sm *storeMsg) (err error) {
895
filename := path.Join(rootDir, TxIDFile)
896
fp, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_TRUNC|os.
905
if _, err = fp.WriteString(fmt.Sprintf("%d", sm.txId)); err != nil {
908
log.LogInfof("storeTxID: store complete: partitionID(%v) volume(%v) txId(%v)",
909
mp.config.PartitionId, mp.config.VolName, sm.txId)
913
func (mp *metaPartition) storeTxRbDentry(rootDir string, sm *storeMsg) (crc uint32, err error) {
914
filename := path.Join(rootDir, txRbDentryFile)
915
fp, err := os.OpenFile(filename, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.O_CREATE, 0o755)
921
// TODO Unhandled errors
926
lenBuf := make([]byte, 4)
927
sign := crc32.NewIEEE()
929
sm.txRbDentryTree.Ascend(func(i BtreeItem) bool {
930
rbDentry := i.(*TxRollbackDentry)
931
if data, err = rbDentry.Marshal(); err != nil {
934
binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
935
if _, err = fp.Write(lenBuf); err != nil {
938
if _, err = sign.Write(lenBuf); err != nil {
942
if _, err = fp.Write(data); err != nil {
945
if _, err = sign.Write(data); err != nil {
952
log.LogInfof("storeTxRbDentry: store complete: partitoinID(%v) volume(%v) numRbDentry(%v) crc(%v)",
953
mp.config.PartitionId, mp.config.VolName, sm.txRbDentryTree.Len(), crc)
957
func (mp *metaPartition) storeTxRbInode(rootDir string, sm *storeMsg) (crc uint32, err error) {
958
filename := path.Join(rootDir, txRbInodeFile)
959
fp, err := os.OpenFile(filename, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.O_CREATE, 0o755)
965
// TODO Unhandled errors
970
lenBuf := make([]byte, 4)
971
sign := crc32.NewIEEE()
973
sm.txRbInodeTree.Ascend(func(i BtreeItem) bool {
974
rbInode := i.(*TxRollbackInode)
975
if data, err = rbInode.Marshal(); err != nil {
978
binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
979
if _, err = fp.Write(lenBuf); err != nil {
982
if _, err = sign.Write(lenBuf); err != nil {
986
if _, err = fp.Write(data); err != nil {
989
if _, err = sign.Write(data); err != nil {
996
log.LogInfof("storeTxRbInode: store complete: partitoinID(%v) volume(%v) numRbinode[%v] crc(%v)",
997
mp.config.PartitionId, mp.config.VolName, sm.txRbInodeTree.Len(), crc)
1001
func (mp *metaPartition) storeTxInfo(rootDir string, sm *storeMsg) (crc uint32, err error) {
1002
filename := path.Join(rootDir, txInfoFile)
1003
fp, err := os.OpenFile(filename, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.O_CREATE, 0o755)
1009
// TODO Unhandled errors
1014
lenBuf := make([]byte, 4)
1015
sign := crc32.NewIEEE()
1017
sm.txTree.Ascend(func(i BtreeItem) bool {
1018
tx := i.(*proto.TransactionInfo)
1019
if data, err = tx.Marshal(); err != nil {
1023
binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
1024
if _, err = fp.Write(lenBuf); err != nil {
1027
if _, err = sign.Write(lenBuf); err != nil {
1031
if _, err = fp.Write(data); err != nil {
1034
if _, err = sign.Write(data); err != nil {
1041
log.LogInfof("storeTxInfo: store complete: partitoinID(%v) volume(%v) numTxs(%v) crc(%v)",
1042
mp.config.PartitionId, mp.config.VolName, sm.txTree.Len(), crc)
1046
func (mp *metaPartition) storeInode(rootDir string,
1047
sm *storeMsg) (crc uint32, err error) {
1048
filename := path.Join(rootDir, inodeFile)
1049
fp, err := os.OpenFile(filename, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.
1056
// TODO Unhandled errors
1063
lenBuf := make([]byte, 4)
1064
sign := crc32.NewIEEE()
1065
sm.inodeTree.Ascend(func(i BtreeItem) bool {
1068
mp.acucumUidSizeByStore(ino)
1071
if data, err = ino.Marshal(); err != nil {
1079
binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
1080
if _, err = fp.Write(lenBuf); err != nil {
1083
if _, err = sign.Write(lenBuf); err != nil {
1087
if _, err = fp.Write(data); err != nil {
1090
if _, err = sign.Write(data); err != nil {
1095
mp.acucumRebuildFin(sm.uidRebuild)
1099
log.LogInfof("storeInode: store complete: partitoinID(%v) volume(%v) numInodes(%v) crc(%v), size (%d)",
1100
mp.config.PartitionId, mp.config.VolName, sm.inodeTree.Len(), crc, size)
1105
func (mp *metaPartition) storeDentry(rootDir string,
1106
sm *storeMsg) (crc uint32, err error) {
1107
filename := path.Join(rootDir, dentryFile)
1108
fp, err := os.OpenFile(filename, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.
1115
// TODO Unhandled errors
1119
lenBuf := make([]byte, 4)
1120
sign := crc32.NewIEEE()
1121
sm.dentryTree.Ascend(func(i BtreeItem) bool {
1122
dentry := i.(*Dentry)
1123
data, err = dentry.Marshal()
1128
binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
1129
if _, err = fp.Write(lenBuf); err != nil {
1132
if _, err = sign.Write(lenBuf); err != nil {
1135
if _, err = fp.Write(data); err != nil {
1138
if _, err = sign.Write(data); err != nil {
1144
log.LogInfof("storeDentry: store complete: partitoinID(%v) volume(%v) numDentries(%v) crc(%v)",
1145
mp.config.PartitionId, mp.config.VolName, sm.dentryTree.Len(), crc)
1149
func (mp *metaPartition) storeExtend(rootDir string, sm *storeMsg) (crc uint32, err error) {
1150
extendTree := sm.extendTree
1151
fp := path.Join(rootDir, extendFile)
1153
f, err = os.OpenFile(fp, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.O_CREATE, 0o755)
1157
log.LogDebugf("storeExtend: store start: partitoinID(%v) volume(%v) numInodes(%v) extends(%v)",
1158
mp.config.PartitionId, mp.config.VolName, sm.inodeTree.Len(), sm.extendTree.Len())
1160
closeErr := f.Close()
1161
if err == nil && closeErr != nil {
1165
writer := bufio.NewWriterSize(f, 4*1024*1024)
1166
crc32 := crc32.NewIEEE()
1167
varintTmp := make([]byte, binary.MaxVarintLen64)
1169
// write number of extends
1170
n = binary.PutUvarint(varintTmp, uint64(extendTree.Len()))
1171
if _, err = writer.Write(varintTmp[:n]); err != nil {
1174
if _, err = crc32.Write(varintTmp[:n]); err != nil {
1178
extendTree.Ascend(func(i BtreeItem) bool {
1181
if sm.quotaRebuild {
1182
mp.statisticExtendByStore(e, sm.inodeTree)
1184
if raw, err = e.Bytes(); err != nil {
1188
n = binary.PutUvarint(varintTmp, uint64(len(raw)))
1189
if _, err = writer.Write(varintTmp[:n]); err != nil {
1192
if _, err = crc32.Write(varintTmp[:n]); err != nil {
1196
if _, err = writer.Write(raw); err != nil {
1199
if _, err = crc32.Write(raw); err != nil {
1204
log.LogInfof("storeExtend: write data ok: partitoinID(%v) volume(%v) numInodes(%v) extends(%v) quotaRebuild(%v)",
1205
mp.config.PartitionId, mp.config.VolName, sm.inodeTree.Len(), sm.extendTree.Len(), sm.quotaRebuild)
1206
mp.mqMgr.statisticRebuildFin(sm.quotaRebuild)
1211
if err = writer.Flush(); err != nil {
1214
if err = f.Sync(); err != nil {
1218
log.LogInfof("storeExtend: store complete: partitoinID(%v) volume(%v) numExtends(%v) crc(%v)",
1219
mp.config.PartitionId, mp.config.VolName, extendTree.Len(), crc)
1223
func (mp *metaPartition) storeMultipart(rootDir string, sm *storeMsg) (crc uint32, err error) {
1224
multipartTree := sm.multipartTree
1225
fp := path.Join(rootDir, multipartFile)
1227
f, err = os.OpenFile(fp, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.O_CREATE, 0o755)
1232
closeErr := f.Close()
1233
if err == nil && closeErr != nil {
1237
writer := bufio.NewWriterSize(f, 4*1024*1024)
1238
crc32 := crc32.NewIEEE()
1239
varintTmp := make([]byte, binary.MaxVarintLen64)
1241
// write number of extends
1242
n = binary.PutUvarint(varintTmp, uint64(multipartTree.Len()))
1243
if _, err = writer.Write(varintTmp[:n]); err != nil {
1246
if _, err = crc32.Write(varintTmp[:n]); err != nil {
1249
multipartTree.Ascend(func(i BtreeItem) bool {
1252
if raw, err = m.Bytes(); err != nil {
1256
n = binary.PutUvarint(varintTmp, uint64(len(raw)))
1257
if _, err = writer.Write(varintTmp[:n]); err != nil {
1260
if _, err = crc32.Write(varintTmp[:n]); err != nil {
1264
if _, err = writer.Write(raw); err != nil {
1267
if _, err = crc32.Write(raw); err != nil {
1276
if err = writer.Flush(); err != nil {
1279
if err = f.Sync(); err != nil {
1283
log.LogInfof("storeMultipart: store complete: partitoinID(%v) volume(%v) numMultiparts(%v) crc(%v)",
1284
mp.config.PartitionId, mp.config.VolName, multipartTree.Len(), crc)
1288
func (mp *metaPartition) storeUniqID(rootDir string, sm *storeMsg) (err error) {
1289
filename := path.Join(rootDir, uniqIDFile)
1290
fp, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_TRUNC|os.
1299
if _, err = fp.WriteString(fmt.Sprintf("%d", sm.uniqId)); err != nil {
1302
log.LogInfof("storeUniqID: store complete: partitionID(%v) volume(%v) uniqID(%v)",
1303
mp.config.PartitionId, mp.config.VolName, sm.uniqId)
1307
func (mp *metaPartition) storeUniqChecker(rootDir string, sm *storeMsg) (crc uint32, err error) {
1308
filename := path.Join(rootDir, uniqCheckerFile)
1309
fp, err := os.OpenFile(filename, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.
1320
if data, crc, err = sm.uniqChecker.Marshal(); err != nil {
1324
if _, err = fp.Write(data); err != nil {
1328
log.LogInfof("storeUniqChecker: store complete: PartitionID(%v) volume(%v) crc(%v)",
1329
mp.config.UniqId, mp.config.VolName, crc)