cubefs

Форк
0
/
partition_store.go 
1331 строка · 35.2 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package metanode
16

17
import (
18
	"bufio"
19
	"encoding/binary"
20
	"encoding/json"
21
	"fmt"
22
	"hash/crc32"
23
	"io"
24
	"os"
25
	"path"
26
	"strings"
27
	"sync/atomic"
28
	"time"
29

30
	"github.com/cubefs/cubefs/proto"
31
	"github.com/cubefs/cubefs/util/errors"
32
	"github.com/cubefs/cubefs/util/log"
33
	mmap "github.com/edsrzf/mmap-go"
34
)
35

36
const (
37
	snapshotDir             = "snapshot"
38
	snapshotDirTmp          = ".snapshot"
39
	snapshotBackup          = ".snapshot_backup"
40
	inodeFile               = "inode"
41
	dentryFile              = "dentry"
42
	extendFile              = "extend"
43
	multipartFile           = "multipart"
44
	txInfoFile              = "tx_info"
45
	txRbInodeFile           = "tx_rb_inode"
46
	txRbDentryFile          = "tx_rb_dentry"
47
	applyIDFile             = "apply"
48
	TxIDFile                = "transactionID"
49
	SnapshotSign            = ".sign"
50
	metadataFile            = "meta"
51
	metadataFileTmp         = ".meta"
52
	uniqIDFile              = "uniqID"
53
	uniqCheckerFile         = "uniqChecker"
54
	verdataFile             = "multiVer"
55
	StaleMetadataSuffix     = ".old"
56
	StaleMetadataTimeFormat = "20060102150405.000000000"
57
	verdataInitFile         = "multiVerInitFile"
58
)
59

60
func (mp *metaPartition) loadMetadata() (err error) {
61
	metaFile := path.Join(mp.config.RootDir, metadataFile)
62
	fp, err := os.OpenFile(metaFile, os.O_RDONLY, 0o644)
63
	if err != nil {
64
		err = errors.NewErrorf("[loadMetadata]: OpenFile %s", err.Error())
65
		return
66
	}
67
	defer fp.Close()
68
	data, err := io.ReadAll(fp)
69
	if err != nil || len(data) == 0 {
70
		err = errors.NewErrorf("[loadMetadata]: ReadFile %s, data: %s", err.Error(),
71
			string(data))
72
		return
73
	}
74
	mConf := &MetaPartitionConfig{}
75
	if err = json.Unmarshal(data, mConf); err != nil {
76
		err = errors.NewErrorf("[loadMetadata]: Unmarshal MetaPartitionConfig %s",
77
			err.Error())
78
		return
79
	}
80

81
	if mConf.checkMeta() != nil {
82
		return
83
	}
84
	mp.config.PartitionId = mConf.PartitionId
85
	mp.config.VolName = mConf.VolName
86
	mp.config.Start = mConf.Start
87
	mp.config.End = mConf.End
88
	mp.config.Peers = mConf.Peers
89
	mp.config.Cursor = mp.config.Start
90
	mp.config.UniqId = 0
91

92
	mp.uidManager = NewUidMgr(mp.config.VolName, mp.config.PartitionId)
93
	mp.mqMgr = NewQuotaManager(mp.config.VolName, mp.config.PartitionId)
94

95
	log.LogInfof("loadMetadata: load complete: partitionID(%v) volume(%v) range(%v,%v) cursor(%v)",
96
		mp.config.PartitionId, mp.config.VolName, mp.config.Start, mp.config.End, mp.config.Cursor)
97
	return
98
}
99

100
func (mp *metaPartition) loadInode(rootDir string, crc uint32) (err error) {
101
	var numInodes uint64
102
	defer func() {
103
		if err == nil {
104
			log.LogInfof("loadInode: load complete: partitonID(%v) volume(%v) numInodes(%v)",
105
				mp.config.PartitionId, mp.config.VolName, numInodes)
106
		}
107
	}()
108
	filename := path.Join(rootDir, inodeFile)
109
	if _, err = os.Stat(filename); err != nil {
110
		err = errors.NewErrorf("[loadInode] Stat: %s", err.Error())
111
		return
112
	}
113
	fp, err := os.OpenFile(filename, os.O_RDONLY, 0o644)
114
	if err != nil {
115
		err = errors.NewErrorf("[loadInode] OpenFile: %s", err.Error())
116
		return
117
	}
118
	defer fp.Close()
119
	reader := bufio.NewReaderSize(fp, 4*1024*1024)
120
	inoBuf := make([]byte, 4)
121
	crcCheck := crc32.NewIEEE()
122
	for {
123
		inoBuf = inoBuf[:4]
124
		// first read length
125
		_, err = io.ReadFull(reader, inoBuf)
126
		if err != nil {
127
			if err == io.EOF {
128
				err = nil
129
				if res := crcCheck.Sum32(); res != crc {
130
					log.LogErrorf("[loadInode]: check crc mismatch, expected[%d], actual[%d]", crc, res)
131
					return ErrSnapshotCrcMismatch
132
				}
133
				return
134
			}
135
			err = errors.NewErrorf("[loadInode] ReadHeader: %s", err.Error())
136
			return
137
		}
138
		// length crc
139
		if _, err = crcCheck.Write(inoBuf); err != nil {
140
			return err
141
		}
142

143
		length := binary.BigEndian.Uint32(inoBuf)
144

145
		// next read body
146
		if uint32(cap(inoBuf)) >= length {
147
			inoBuf = inoBuf[:length]
148
		} else {
149
			inoBuf = make([]byte, length)
150
		}
151
		_, err = io.ReadFull(reader, inoBuf)
152
		if err != nil {
153
			err = errors.NewErrorf("[loadInode] ReadBody: %s", err.Error())
154
			return
155
		}
156
		ino := NewInode(0, 0)
157
		if err = ino.Unmarshal(inoBuf); err != nil {
158
			err = errors.NewErrorf("[loadInode] Unmarshal: %s", err.Error())
159
			return
160
		}
161
		mp.acucumUidSizeByLoad(ino)
162
		// data crc
163
		if _, err = crcCheck.Write(inoBuf); err != nil {
164
			return err
165
		}
166

167
		mp.size += ino.Size
168

169
		mp.fsmCreateInode(ino)
170
		mp.checkAndInsertFreeList(ino)
171
		if mp.config.Cursor < ino.Inode {
172
			mp.config.Cursor = ino.Inode
173
		}
174
		numInodes += 1
175
	}
176
}
177

178
// Load dentry from the dentry snapshot.
179
func (mp *metaPartition) loadDentry(rootDir string, crc uint32) (err error) {
180
	var numDentries uint64
181
	defer func() {
182
		if err == nil {
183
			log.LogInfof("loadDentry: load complete: partitonID(%v) volume(%v) numDentries(%v)",
184
				mp.config.PartitionId, mp.config.VolName, numDentries)
185
		}
186
	}()
187
	filename := path.Join(rootDir, dentryFile)
188
	if _, err = os.Stat(filename); err != nil {
189
		err = errors.NewErrorf("[loadDentry] Stat: %s", err.Error())
190
		return
191
	}
192
	fp, err := os.OpenFile(filename, os.O_RDONLY, 0o644)
193
	if err != nil {
194
		err = errors.NewErrorf("[loadDentry] OpenFile: %s", err.Error())
195
		return
196
	}
197

198
	defer fp.Close()
199
	reader := bufio.NewReaderSize(fp, 4*1024*1024)
200
	dentryBuf := make([]byte, 4)
201
	crcCheck := crc32.NewIEEE()
202
	for {
203
		dentryBuf = dentryBuf[:4]
204
		// First Read 4byte header length
205
		_, err = io.ReadFull(reader, dentryBuf)
206
		if err != nil {
207
			if err == io.EOF {
208
				err = nil
209
				if res := crcCheck.Sum32(); res != crc {
210
					log.LogErrorf("[loadDentry]: check crc mismatch, expected[%d], actual[%d]", crc, res)
211
					return ErrSnapshotCrcMismatch
212
				}
213
				return
214
			}
215
			err = errors.NewErrorf("[loadDentry] ReadHeader: %s", err.Error())
216
			return
217
		}
218
		if _, err = crcCheck.Write(dentryBuf); err != nil {
219
			return err
220
		}
221
		length := binary.BigEndian.Uint32(dentryBuf)
222

223
		// next read body
224
		if uint32(cap(dentryBuf)) >= length {
225
			dentryBuf = dentryBuf[:length]
226
		} else {
227
			dentryBuf = make([]byte, length)
228
		}
229
		_, err = io.ReadFull(reader, dentryBuf)
230
		if err != nil {
231
			err = errors.NewErrorf("[loadDentry]: ReadBody: %s", err.Error())
232
			return
233
		}
234
		dentry := &Dentry{}
235
		if err = dentry.Unmarshal(dentryBuf); err != nil {
236
			err = errors.NewErrorf("[loadDentry] Unmarshal: %s", err.Error())
237
			return
238
		}
239
		if status := mp.fsmCreateDentry(dentry, true); status != proto.OpOk {
240
			err = errors.NewErrorf("[loadDentry] createDentry dentry: %v, resp code: %d", dentry, status)
241
			return
242
		}
243
		if _, err = crcCheck.Write(dentryBuf); err != nil {
244
			return err
245
		}
246
		numDentries += 1
247
	}
248
}
249

250
func (mp *metaPartition) loadExtend(rootDir string, crc uint32) (err error) {
251
	filename := path.Join(rootDir, extendFile)
252
	if _, err = os.Stat(filename); err != nil {
253
		err = errors.NewErrorf("[loadExtend] Stat: %s", err.Error())
254
		return err
255
	}
256
	fp, err := os.OpenFile(filename, os.O_RDONLY, 0o644)
257
	if err != nil {
258
		err = errors.NewErrorf("[loadExtend] OpenFile: %s", err.Error())
259
		return err
260
	}
261
	defer func() {
262
		_ = fp.Close()
263
	}()
264
	var mem mmap.MMap
265
	if mem, err = mmap.Map(fp, mmap.RDONLY, 0); err != nil {
266
		return err
267
	}
268
	defer func() {
269
		_ = mem.Unmap()
270
	}()
271
	var offset, n int
272
	// read number of extends
273
	var numExtends uint64
274
	numExtends, n = binary.Uvarint(mem)
275
	offset += n
276

277
	varintTmp := make([]byte, binary.MaxVarintLen64)
278
	// write number of extends
279
	n = binary.PutUvarint(varintTmp, numExtends)
280

281
	crcCheck := crc32.NewIEEE()
282
	if _, err = crcCheck.Write(varintTmp[:n]); err != nil {
283
		return
284
	}
285
	for i := uint64(0); i < numExtends; i++ {
286
		// read length
287
		var numBytes uint64
288
		numBytes, n = binary.Uvarint(mem[offset:])
289
		offset += n
290
		var extend *Extend
291
		if extend, err = NewExtendFromBytes(mem[offset : offset+int(numBytes)]); err != nil {
292
			return err
293
		}
294

295
		if _, err = crcCheck.Write(mem[offset-n : offset]); err != nil {
296
			return err
297
		}
298
		// log.LogDebugf("loadExtend: new extend from bytes: partitionID (%v) volume(%v) inode[%v]",
299
		//	mp.config.PartitionId, mp.config.VolName, extend.inode)
300
		_ = mp.fsmSetXAttr(extend)
301

302
		if _, err = crcCheck.Write(mem[offset : offset+int(numBytes)]); err != nil {
303
			return
304
		}
305
		offset += int(numBytes)
306
		mp.statisticExtendByLoad(extend)
307
	}
308

309
	log.LogInfof("loadExtend: load complete: partitionID(%v) volume(%v) numExtends(%v) filename(%v)",
310
		mp.config.PartitionId, mp.config.VolName, numExtends, filename)
311
	if res := crcCheck.Sum32(); res != crc {
312
		log.LogErrorf("loadExtend: check crc mismatch, expected[%d], actual[%d]", crc, res)
313
		return ErrSnapshotCrcMismatch
314
	}
315
	return nil
316
}
317

318
func (mp *metaPartition) loadMultipart(rootDir string, crc uint32) (err error) {
319
	filename := path.Join(rootDir, multipartFile)
320
	if _, err = os.Stat(filename); err != nil {
321
		err = errors.NewErrorf("[loadMultipart] Stat: %s", err.Error())
322
		return err
323
	}
324
	fp, err := os.OpenFile(filename, os.O_RDONLY, 0o644)
325
	if err != nil {
326
		err = errors.NewErrorf("[loadMultipart] OpenFile: %s", err.Error())
327
		return err
328
	}
329
	defer func() {
330
		_ = fp.Close()
331
	}()
332
	var mem mmap.MMap
333
	if mem, err = mmap.Map(fp, mmap.RDONLY, 0); err != nil {
334
		return err
335
	}
336
	defer func() {
337
		_ = mem.Unmap()
338
	}()
339
	var offset, n int
340
	// read number of multipart
341
	var numMultiparts uint64
342
	numMultiparts, n = binary.Uvarint(mem)
343
	varintTmp := make([]byte, binary.MaxVarintLen64)
344
	// write number of multipart
345
	n = binary.PutUvarint(varintTmp, numMultiparts)
346
	crcCheck := crc32.NewIEEE()
347
	if _, err = crcCheck.Write(varintTmp[:n]); err != nil {
348
		return
349
	}
350
	offset += n
351
	for i := uint64(0); i < numMultiparts; i++ {
352
		// read length
353
		var numBytes uint64
354
		numBytes, n = binary.Uvarint(mem[offset:])
355
		offset += n
356
		if _, err = crcCheck.Write(mem[offset-n : offset]); err != nil {
357
			return err
358
		}
359
		var multipart *Multipart
360
		multipart = MultipartFromBytes(mem[offset : offset+int(numBytes)])
361
		log.LogDebugf("loadMultipart: create multipart from bytes: partitionID(%v) multipartID(%v)", mp.config.PartitionId, multipart.id)
362
		mp.fsmCreateMultipart(multipart)
363
		offset += int(numBytes)
364
		if _, err = crcCheck.Write(mem[offset-int(numBytes) : offset]); err != nil {
365
			return err
366
		}
367
	}
368
	log.LogInfof("loadMultipart: load complete: partitionID(%v) numMultiparts(%v) filename(%v)",
369
		mp.config.PartitionId, numMultiparts, filename)
370
	if res := crcCheck.Sum32(); res != crc {
371
		log.LogErrorf("[loadMultipart] check crc mismatch, expected[%d], actual[%d]", crc, res)
372
		return ErrSnapshotCrcMismatch
373
	}
374
	return nil
375
}
376

377
func (mp *metaPartition) loadApplyID(rootDir string) (err error) {
378
	filename := path.Join(rootDir, applyIDFile)
379
	if _, err = os.Stat(filename); err != nil {
380
		err = errors.NewErrorf("[loadApplyID]: Stat %s", err.Error())
381
		return
382
	}
383
	data, err := os.ReadFile(filename)
384
	if err != nil {
385
		err = errors.NewErrorf("[loadApplyID] ReadFile: %s", err.Error())
386
		return
387
	}
388
	if len(data) == 0 {
389
		err = errors.NewErrorf("[loadApplyID]: ApplyID is empty")
390
		return
391
	}
392
	var cursor uint64
393
	if strings.Contains(string(data), "|") {
394
		_, err = fmt.Sscanf(string(data), "%d|%d", &mp.applyID, &cursor)
395
	} else {
396
		_, err = fmt.Sscanf(string(data), "%d", &mp.applyID)
397
	}
398
	if err != nil {
399
		err = errors.NewErrorf("[loadApplyID] ReadApplyID: %s", err.Error())
400
		return
401
	}
402

403
	mp.storedApplyId = mp.applyID
404

405
	if cursor > mp.GetCursor() {
406
		atomic.StoreUint64(&mp.config.Cursor, cursor)
407
	}
408

409
	log.LogInfof("loadApplyID: load complete: partitionID(%v) volume(%v) applyID(%v) cursor(%v) filename(%v)",
410
		mp.config.PartitionId, mp.config.VolName, mp.applyID, mp.config.Cursor, filename)
411
	return
412
}
413

414
func (mp *metaPartition) loadTxRbDentry(rootDir string, crc uint32) (err error) {
415
	var numTxRbDentry uint64
416
	defer func() {
417
		if err == nil {
418
			log.LogInfof("loadTxRbDentry: load complete: partitonID(%v) volume(%v) numInodes(%v)",
419
				mp.config.PartitionId, mp.config.VolName, numTxRbDentry)
420
		}
421
	}()
422
	filename := path.Join(rootDir, txRbDentryFile)
423
	if _, err = os.Stat(filename); err != nil {
424
		err = errors.NewErrorf("[loadTxRbDentry] Stat: %s", err.Error())
425
		return
426
	}
427
	fp, err := os.OpenFile(filename, os.O_RDONLY, 0o644)
428
	if err != nil {
429
		err = errors.NewErrorf("[loadTxRbDentry] OpenFile: %s", err.Error())
430
		return
431
	}
432
	defer fp.Close()
433
	reader := bufio.NewReaderSize(fp, 4*1024*1024)
434
	txBuf := make([]byte, 4)
435
	crcCheck := crc32.NewIEEE()
436

437
	for {
438
		txBuf = txBuf[:4]
439
		// first read length
440
		_, err = io.ReadFull(reader, txBuf)
441
		if err != nil {
442
			if err == io.EOF {
443
				err = nil
444
				if res := crcCheck.Sum32(); res != crc {
445
					log.LogErrorf("[loadTxRbDentry]: check crc mismatch, expected[%d], actual[%d]", crc, res)
446
					return ErrSnapshotCrcMismatch
447
				}
448
				return
449
			}
450
			err = errors.NewErrorf("[loadTxRbDentry] ReadHeader: %s", err.Error())
451
			return
452
		}
453
		// length crc
454
		if _, err = crcCheck.Write(txBuf); err != nil {
455
			return err
456
		}
457

458
		length := binary.BigEndian.Uint32(txBuf)
459

460
		// next read body
461
		if uint32(cap(txBuf)) >= length {
462
			txBuf = txBuf[:length]
463
		} else {
464
			txBuf = make([]byte, length)
465
		}
466
		_, err = io.ReadFull(reader, txBuf)
467
		if err != nil {
468
			err = errors.NewErrorf("[loadTxRbDentry] ReadBody: %s", err.Error())
469
			return
470
		}
471

472
		txRbDentry := NewTxRollbackDentry(nil, nil, 0)
473
		if err = txRbDentry.Unmarshal(txBuf); err != nil {
474
			err = errors.NewErrorf("[loadTxRbDentry] Unmarshal: %s", err.Error())
475
			return
476
		}
477

478
		// data crc
479
		if _, err = crcCheck.Write(txBuf); err != nil {
480
			return err
481
		}
482

483
		// mp.txProcessor.txResource.txRollbackDentries[txRbDentry.txDentryInfo.GetKey()] = txRbDentry
484
		mp.txProcessor.txResource.txRbDentryTree.ReplaceOrInsert(txRbDentry, true)
485
		numTxRbDentry++
486
	}
487
}
488

489
func (mp *metaPartition) loadTxRbInode(rootDir string, crc uint32) (err error) {
490
	var numTxRbInode uint64
491
	defer func() {
492
		if err == nil {
493
			log.LogInfof("loadTxRbInode: load complete: partitonID(%v) volume(%v) numInodes(%v)",
494
				mp.config.PartitionId, mp.config.VolName, numTxRbInode)
495
		}
496
	}()
497
	filename := path.Join(rootDir, txRbInodeFile)
498
	if _, err = os.Stat(filename); err != nil {
499
		err = errors.NewErrorf("[loadTxRbInode] Stat: %s", err.Error())
500
		return
501
	}
502
	fp, err := os.OpenFile(filename, os.O_RDONLY, 0o644)
503
	if err != nil {
504
		err = errors.NewErrorf("[loadTxRbInode] OpenFile: %s", err.Error())
505
		return
506
	}
507
	defer fp.Close()
508
	reader := bufio.NewReaderSize(fp, 4*1024*1024)
509
	txBuf := make([]byte, 4)
510
	crcCheck := crc32.NewIEEE()
511

512
	for {
513
		txBuf = txBuf[:4]
514
		// first read length
515
		_, err = io.ReadFull(reader, txBuf)
516
		if err != nil {
517
			if err == io.EOF {
518
				err = nil
519
				return
520
			}
521
			err = errors.NewErrorf("[loadTxRbInode] ReadHeader: %s", err.Error())
522
			return
523
		}
524
		// length crc
525
		if _, err = crcCheck.Write(txBuf); err != nil {
526
			return err
527
		}
528

529
		length := binary.BigEndian.Uint32(txBuf)
530

531
		// next read body
532
		if uint32(cap(txBuf)) >= length {
533
			txBuf = txBuf[:length]
534
		} else {
535
			txBuf = make([]byte, length)
536
		}
537
		_, err = io.ReadFull(reader, txBuf)
538
		if err != nil {
539
			err = errors.NewErrorf("[loadTxRbInode] ReadBody: %s", err.Error())
540
			return
541
		}
542

543
		txRbInode := NewTxRollbackInode(nil, []uint32{}, nil, 0)
544
		if err = txRbInode.Unmarshal(txBuf); err != nil {
545
			err = errors.NewErrorf("[loadTxRbInode] Unmarshal: %s", err.Error())
546
			return
547
		}
548
		// data crc
549
		if _, err = crcCheck.Write(txBuf); err != nil {
550
			return err
551
		}
552

553
		mp.txProcessor.txResource.txRbInodeTree.ReplaceOrInsert(txRbInode, true)
554
		numTxRbInode++
555
	}
556
}
557

558
func (mp *metaPartition) loadTxInfo(rootDir string, crc uint32) (err error) {
559
	var numTxInfos uint64
560
	defer func() {
561
		if err == nil {
562
			log.LogInfof("loadTxInfo: load complete: partitonID(%v) volume(%v) numInodes(%v)",
563
				mp.config.PartitionId, mp.config.VolName, numTxInfos)
564
		}
565
	}()
566
	filename := path.Join(rootDir, txInfoFile)
567
	if _, err = os.Stat(filename); err != nil {
568
		err = errors.NewErrorf("[loadTxInfo] Stat: %s", err.Error())
569
		return
570
	}
571
	fp, err := os.OpenFile(filename, os.O_RDONLY, 0o644)
572
	if err != nil {
573
		err = errors.NewErrorf("[loadTxInfo] OpenFile: %s", err.Error())
574
		return
575
	}
576
	defer fp.Close()
577
	reader := bufio.NewReaderSize(fp, 4*1024*1024)
578
	txBuf := make([]byte, 4)
579
	crcCheck := crc32.NewIEEE()
580

581
	for {
582
		txBuf = txBuf[:4]
583
		// first read length
584
		_, err = io.ReadFull(reader, txBuf)
585
		if err != nil {
586
			if err == io.EOF {
587
				err = nil
588
				if res := crcCheck.Sum32(); res != crc {
589
					log.LogErrorf("[loadTxInfo]: check crc mismatch, expected[%d], actual[%d]", crc, res)
590
					return ErrSnapshotCrcMismatch
591
				}
592
				return
593
			}
594
			err = errors.NewErrorf("[loadTxInfo] ReadHeader: %s", err.Error())
595
			return
596
		}
597
		// length crc
598
		if _, err = crcCheck.Write(txBuf); err != nil {
599
			return err
600
		}
601

602
		length := binary.BigEndian.Uint32(txBuf)
603

604
		// next read body
605
		if uint32(cap(txBuf)) >= length {
606
			txBuf = txBuf[:length]
607
		} else {
608
			txBuf = make([]byte, length)
609
		}
610
		_, err = io.ReadFull(reader, txBuf)
611
		if err != nil {
612
			err = errors.NewErrorf("[loadTxInfo] ReadBody: %s", err.Error())
613
			return
614
		}
615

616
		txInfo := proto.NewTransactionInfo(0, proto.TxTypeUndefined)
617
		if err = txInfo.Unmarshal(txBuf); err != nil {
618
			err = errors.NewErrorf("[loadTxInfo] Unmarshal: %s", err.Error())
619
			return
620
		}
621

622
		// data crc
623
		if _, err = crcCheck.Write(txBuf); err != nil {
624
			return err
625
		}
626

627
		mp.txProcessor.txManager.addTxInfo(txInfo)
628
		numTxInfos++
629
	}
630
}
631

632
func (mp *metaPartition) loadTxID(rootDir string) (err error) {
633
	filename := path.Join(rootDir, TxIDFile)
634
	if _, err = os.Stat(filename); err != nil {
635
		err = nil
636
		return
637
	}
638
	data, err := os.ReadFile(filename)
639
	if err != nil {
640
		err = errors.NewErrorf("[loadTxID] OpenFile: %s", err.Error())
641
		return
642
	}
643
	if len(data) == 0 {
644
		err = errors.NewErrorf("[loadTxID]: TxID is empty")
645
		return
646
	}
647
	var txId uint64
648
	_, err = fmt.Sscanf(string(data), "%d", &txId)
649
	if err != nil {
650
		err = errors.NewErrorf("[loadTxID] ReadTxID: %s", err.Error())
651
		return
652
	}
653

654
	if txId > mp.txProcessor.txManager.txIdAlloc.getTransactionID() {
655
		mp.txProcessor.txManager.txIdAlloc.setTransactionID(txId)
656
	}
657
	log.LogInfof("loadTxID: load complete: partitionID(%v) volume(%v) txId(%v) filename(%v)",
658
		mp.config.PartitionId, mp.config.VolName, mp.txProcessor.txManager.txIdAlloc.getTransactionID(), filename)
659
	return
660
}
661

662
func (mp *metaPartition) loadUniqID(rootDir string) (err error) {
663
	filename := path.Join(rootDir, uniqIDFile)
664
	if _, err = os.Stat(filename); err != nil {
665
		err = nil
666
		return
667
	}
668
	data, err := os.ReadFile(filename)
669
	if err != nil {
670
		err = errors.NewErrorf("[loadUniqID] OpenFile: %s", err.Error())
671
		return
672
	}
673
	if len(data) == 0 {
674
		err = errors.NewErrorf("[loadUniqID]: uniqID is empty")
675
		return
676
	}
677
	var uniqId uint64
678
	_, err = fmt.Sscanf(string(data), "%d", &uniqId)
679
	if err != nil {
680
		err = errors.NewErrorf("[loadUniqID] Read uniqID: %s", err.Error())
681
		return
682
	}
683

684
	if uniqId > mp.GetUniqId() {
685
		atomic.StoreUint64(&mp.config.UniqId, uniqId)
686
	}
687

688
	log.LogInfof("loadUniqID: load complete: partitionID(%v) volume(%v) uniqID(%v) filename(%v)",
689
		mp.config.PartitionId, mp.config.VolName, mp.GetUniqId(), filename)
690
	return
691
}
692

693
func (mp *metaPartition) loadUniqChecker(rootDir string, crc uint32) (err error) {
694
	log.LogInfof("loadUniqChecker partition(%v) begin", mp.config.PartitionId)
695
	filename := path.Join(rootDir, uniqCheckerFile)
696
	if _, err = os.Stat(filename); err != nil {
697
		log.LogErrorf("loadUniqChecker get file %s err(%s)", filename, err)
698
		err = nil
699
		return
700
	}
701

702
	data, err := os.ReadFile(filename)
703
	if err != nil {
704
		log.LogErrorf("loadUniqChecker read file %s err(%s)", filename, err)
705
		err = errors.NewErrorf("[loadUniqChecker] OpenFile: %v", err.Error())
706
		return
707
	}
708
	if err = mp.uniqChecker.UnMarshal(data); err != nil {
709
		log.LogErrorf("loadUniqChecker UnMarshal err(%s)", err)
710
		err = errors.NewErrorf("[loadUniqChecker] Unmarshal: %v", err.Error())
711
		return
712
	}
713

714
	crcCheck := crc32.NewIEEE()
715
	if _, err = crcCheck.Write(data); err != nil {
716
		log.LogErrorf("loadUniqChecker write to  crcCheck failed: %s", err)
717
		return err
718
	}
719
	if res := crcCheck.Sum32(); res != crc {
720
		log.LogErrorf("[loadUniqChecker]: check crc mismatch, expected[%d], actual[%d]", crc, res)
721
		return ErrSnapshotCrcMismatch
722
	}
723

724
	log.LogInfof("loadUniqChecker partition(%v) complete", mp.config.PartitionId)
725
	return
726
}
727

728
func (mp *metaPartition) loadMultiVer(rootDir string, crc uint32) (err error) {
729
	filename := path.Join(rootDir, verdataFile)
730
	if _, err = os.Stat(filename); err != nil {
731

732
		err = nil
733
		return
734
	}
735

736
	data, err := os.ReadFile(filename)
737
	if err != nil {
738
		if err == os.ErrNotExist {
739
			err = nil
740
			return
741
		}
742

743
		err = errors.NewErrorf("[loadMultiVer] OpenFile: %s", err.Error())
744
		return
745
	}
746

747
	if len(data) == 0 {
748
		err = errors.NewErrorf("[loadMultiVer]: ApplyID is empty")
749
		return
750
	}
751

752
	var (
753
		verData string
754
		applyId uint64
755
	)
756
	if strings.Contains(string(data), "|") {
757
		_, err = fmt.Sscanf(string(data), "%d|%s", &applyId, &verData)
758
	} else {
759
		_, err = fmt.Sscanf(string(data), "%d", &applyId)
760
	}
761

762
	if err != nil {
763
		err = errors.NewErrorf("[loadMultiVer] ReadVerList: %s", err.Error())
764
		return
765
	}
766

767
	var verList []*proto.VolVersionInfo
768
	if err = json.Unmarshal([]byte(verData), &verList); err != nil {
769
		err = errors.NewErrorf("[loadMultiVer] ReadVerList: %s verData(%v) applyId %v", verList, verData, applyId)
770
		return
771
	}
772

773
	var byteData []byte
774
	if byteData, err = json.Marshal(verList); err != nil {
775
		return
776
	}
777
	sign := crc32.NewIEEE()
778
	if _, err = sign.Write(byteData); err != nil {
779
		return
780
	}
781

782
	if crc != sign.Sum32() {
783
		return fmt.Errorf("partitionID(%v) volume(%v) calc crc %v not equal with disk %v", mp.config.PartitionId, mp.config.VolName, sign.Sum32(), crc)
784
	}
785

786
	mp.multiVersionList.VerList = verList
787
	mp.verSeq = mp.multiVersionList.GetLastVer()
788

789
	log.LogInfof("loadMultiVer: updateVerList load complete: partitionID(%v) volume(%v) applyID(%v) filename(%v) verlist (%v) crc (%v) mp Ver(%v)",
790
		mp.config.PartitionId, mp.config.VolName, mp.applyID, filename, mp.multiVersionList.VerList, crc, mp.verSeq)
791
	return
792
}
793

794
func (mp *metaPartition) storeMultiVersion(rootDir string, sm *storeMsg) (crc uint32, err error) {
795
	filename := path.Join(rootDir, verdataFile)
796
	fp, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_TRUNC|os.
797
		O_CREATE, 0o755)
798
	if err != nil {
799
		return
800
	}
801
	defer func() {
802
		err = fp.Sync()
803
		fp.Close()
804
	}()
805
	var verData []byte
806
	if verData, err = json.Marshal(sm.multiVerList); err != nil {
807
		return
808
	}
809
	sign := crc32.NewIEEE()
810
	if _, err = sign.Write(verData); err != nil {
811
		return
812
	}
813
	crc = sign.Sum32()
814

815
	if _, err = fp.WriteString(fmt.Sprintf("%d|%s", sm.applyIndex, string(verData))); err != nil {
816
		return
817
	}
818
	log.LogInfof("storeMultiVersion: store complete: partitionID(%v) volume(%v) applyID(%v) verData(%v) crc(%v)",
819
		mp.config.PartitionId, mp.config.VolName, sm.applyIndex, string(verData), crc)
820
	return
821
}
822

823
func (mp *metaPartition) renameStaleMetadata() (err error) {
824
	if _, err = os.Stat(mp.config.RootDir); err != nil {
825
		if os.IsNotExist(err) {
826
			return nil
827
		}
828
	}
829

830
	curTime := time.Now().Format(StaleMetadataTimeFormat)
831
	staleMetaDirName := mp.config.RootDir + "_" + curTime + StaleMetadataSuffix
832
	if err = os.Rename(mp.config.RootDir, staleMetaDirName); err != nil {
833
		return err
834
	}
835
	return nil
836
}
837

838
func (mp *metaPartition) persistMetadata() (err error) {
839
	if err = mp.config.checkMeta(); err != nil {
840
		err = errors.NewErrorf("[persistMetadata]->%s", err.Error())
841
		return
842
	}
843

844
	// TODO Unhandled errors
845
	os.MkdirAll(mp.config.RootDir, 0o755)
846
	filename := path.Join(mp.config.RootDir, metadataFileTmp)
847
	fp, err := os.OpenFile(filename, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.O_CREATE, 0o755)
848
	if err != nil {
849
		return
850
	}
851
	defer func() {
852
		// TODO Unhandled errors
853
		fp.Sync()
854
		fp.Close()
855
		os.Remove(filename)
856
	}()
857

858
	data, err := json.Marshal(mp.config)
859
	if err != nil {
860
		return
861
	}
862
	if _, err = fp.Write(data); err != nil {
863
		return
864
	}
865
	if err = os.Rename(filename, path.Join(mp.config.RootDir, metadataFile)); err != nil {
866
		return
867
	}
868
	log.LogInfof("persistMetata: persist complete: partitionID(%v) volume(%v) range(%v,%v) cursor(%v)",
869
		mp.config.PartitionId, mp.config.VolName, mp.config.Start, mp.config.End, mp.config.Cursor)
870
	return
871
}
872

873
func (mp *metaPartition) storeApplyID(rootDir string, sm *storeMsg) (err error) {
874
	filename := path.Join(rootDir, applyIDFile)
875
	fp, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_TRUNC|os.
876
		O_CREATE, 0o755)
877
	if err != nil {
878
		return
879
	}
880
	defer func() {
881
		err = fp.Sync()
882
		fp.Close()
883
	}()
884

885
	cursor := mp.GetCursor()
886
	if _, err = fp.WriteString(fmt.Sprintf("%d|%d", sm.applyIndex, cursor)); err != nil {
887
		return
888
	}
889
	log.LogWarnf("storeApplyID: store complete: partitionID(%v) volume(%v) applyID(%v) cursor(%v)",
890
		mp.config.PartitionId, mp.config.VolName, sm.applyIndex, cursor)
891
	return
892
}
893

894
func (mp *metaPartition) storeTxID(rootDir string, sm *storeMsg) (err error) {
895
	filename := path.Join(rootDir, TxIDFile)
896
	fp, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_TRUNC|os.
897
		O_CREATE, 0o755)
898
	if err != nil {
899
		return
900
	}
901
	defer func() {
902
		err = fp.Sync()
903
		fp.Close()
904
	}()
905
	if _, err = fp.WriteString(fmt.Sprintf("%d", sm.txId)); err != nil {
906
		return
907
	}
908
	log.LogInfof("storeTxID: store complete: partitionID(%v) volume(%v) txId(%v)",
909
		mp.config.PartitionId, mp.config.VolName, sm.txId)
910
	return
911
}
912

913
func (mp *metaPartition) storeTxRbDentry(rootDir string, sm *storeMsg) (crc uint32, err error) {
914
	filename := path.Join(rootDir, txRbDentryFile)
915
	fp, err := os.OpenFile(filename, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.O_CREATE, 0o755)
916
	if err != nil {
917
		return
918
	}
919
	defer func() {
920
		err = fp.Sync()
921
		// TODO Unhandled errors
922
		fp.Close()
923
	}()
924

925
	var data []byte
926
	lenBuf := make([]byte, 4)
927
	sign := crc32.NewIEEE()
928

929
	sm.txRbDentryTree.Ascend(func(i BtreeItem) bool {
930
		rbDentry := i.(*TxRollbackDentry)
931
		if data, err = rbDentry.Marshal(); err != nil {
932
			return false
933
		}
934
		binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
935
		if _, err = fp.Write(lenBuf); err != nil {
936
			return false
937
		}
938
		if _, err = sign.Write(lenBuf); err != nil {
939
			return false
940
		}
941

942
		if _, err = fp.Write(data); err != nil {
943
			return false
944
		}
945
		if _, err = sign.Write(data); err != nil {
946
			return false
947
		}
948
		return true
949
	})
950

951
	crc = sign.Sum32()
952
	log.LogInfof("storeTxRbDentry: store complete: partitoinID(%v) volume(%v) numRbDentry(%v) crc(%v)",
953
		mp.config.PartitionId, mp.config.VolName, sm.txRbDentryTree.Len(), crc)
954
	return
955
}
956

957
func (mp *metaPartition) storeTxRbInode(rootDir string, sm *storeMsg) (crc uint32, err error) {
958
	filename := path.Join(rootDir, txRbInodeFile)
959
	fp, err := os.OpenFile(filename, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.O_CREATE, 0o755)
960
	if err != nil {
961
		return
962
	}
963
	defer func() {
964
		err = fp.Sync()
965
		// TODO Unhandled errors
966
		fp.Close()
967
	}()
968

969
	var data []byte
970
	lenBuf := make([]byte, 4)
971
	sign := crc32.NewIEEE()
972

973
	sm.txRbInodeTree.Ascend(func(i BtreeItem) bool {
974
		rbInode := i.(*TxRollbackInode)
975
		if data, err = rbInode.Marshal(); err != nil {
976
			return false
977
		}
978
		binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
979
		if _, err = fp.Write(lenBuf); err != nil {
980
			return false
981
		}
982
		if _, err = sign.Write(lenBuf); err != nil {
983
			return false
984
		}
985

986
		if _, err = fp.Write(data); err != nil {
987
			return false
988
		}
989
		if _, err = sign.Write(data); err != nil {
990
			return false
991
		}
992
		return true
993
	})
994

995
	crc = sign.Sum32()
996
	log.LogInfof("storeTxRbInode: store complete: partitoinID(%v) volume(%v) numRbinode[%v] crc(%v)",
997
		mp.config.PartitionId, mp.config.VolName, sm.txRbInodeTree.Len(), crc)
998
	return
999
}
1000

1001
func (mp *metaPartition) storeTxInfo(rootDir string, sm *storeMsg) (crc uint32, err error) {
1002
	filename := path.Join(rootDir, txInfoFile)
1003
	fp, err := os.OpenFile(filename, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.O_CREATE, 0o755)
1004
	if err != nil {
1005
		return
1006
	}
1007
	defer func() {
1008
		err = fp.Sync()
1009
		// TODO Unhandled errors
1010
		fp.Close()
1011
	}()
1012

1013
	var data []byte
1014
	lenBuf := make([]byte, 4)
1015
	sign := crc32.NewIEEE()
1016

1017
	sm.txTree.Ascend(func(i BtreeItem) bool {
1018
		tx := i.(*proto.TransactionInfo)
1019
		if data, err = tx.Marshal(); err != nil {
1020
			return false
1021
		}
1022

1023
		binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
1024
		if _, err = fp.Write(lenBuf); err != nil {
1025
			return false
1026
		}
1027
		if _, err = sign.Write(lenBuf); err != nil {
1028
			return false
1029
		}
1030

1031
		if _, err = fp.Write(data); err != nil {
1032
			return false
1033
		}
1034
		if _, err = sign.Write(data); err != nil {
1035
			return false
1036
		}
1037
		return true
1038
	})
1039

1040
	crc = sign.Sum32()
1041
	log.LogInfof("storeTxInfo: store complete: partitoinID(%v) volume(%v) numTxs(%v) crc(%v)",
1042
		mp.config.PartitionId, mp.config.VolName, sm.txTree.Len(), crc)
1043
	return
1044
}
1045

1046
func (mp *metaPartition) storeInode(rootDir string,
1047
	sm *storeMsg) (crc uint32, err error) {
1048
	filename := path.Join(rootDir, inodeFile)
1049
	fp, err := os.OpenFile(filename, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.
1050
		O_CREATE, 0o755)
1051
	if err != nil {
1052
		return
1053
	}
1054
	defer func() {
1055
		err = fp.Sync()
1056
		// TODO Unhandled errors
1057
		fp.Close()
1058
	}()
1059

1060
	size := uint64(0)
1061

1062
	var data []byte
1063
	lenBuf := make([]byte, 4)
1064
	sign := crc32.NewIEEE()
1065
	sm.inodeTree.Ascend(func(i BtreeItem) bool {
1066
		ino := i.(*Inode)
1067
		if sm.uidRebuild {
1068
			mp.acucumUidSizeByStore(ino)
1069
		}
1070

1071
		if data, err = ino.Marshal(); err != nil {
1072
			return false
1073
		}
1074

1075
		size += ino.Size
1076
		mp.fileStats(ino)
1077

1078
		// set length
1079
		binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
1080
		if _, err = fp.Write(lenBuf); err != nil {
1081
			return false
1082
		}
1083
		if _, err = sign.Write(lenBuf); err != nil {
1084
			return false
1085
		}
1086
		// set body
1087
		if _, err = fp.Write(data); err != nil {
1088
			return false
1089
		}
1090
		if _, err = sign.Write(data); err != nil {
1091
			return false
1092
		}
1093
		return true
1094
	})
1095
	mp.acucumRebuildFin(sm.uidRebuild)
1096
	crc = sign.Sum32()
1097
	mp.size = size
1098

1099
	log.LogInfof("storeInode: store complete: partitoinID(%v) volume(%v) numInodes(%v) crc(%v), size (%d)",
1100
		mp.config.PartitionId, mp.config.VolName, sm.inodeTree.Len(), crc, size)
1101

1102
	return
1103
}
1104

1105
func (mp *metaPartition) storeDentry(rootDir string,
1106
	sm *storeMsg) (crc uint32, err error) {
1107
	filename := path.Join(rootDir, dentryFile)
1108
	fp, err := os.OpenFile(filename, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.
1109
		O_CREATE, 0o755)
1110
	if err != nil {
1111
		return
1112
	}
1113
	defer func() {
1114
		err = fp.Sync()
1115
		// TODO Unhandled errors
1116
		fp.Close()
1117
	}()
1118
	var data []byte
1119
	lenBuf := make([]byte, 4)
1120
	sign := crc32.NewIEEE()
1121
	sm.dentryTree.Ascend(func(i BtreeItem) bool {
1122
		dentry := i.(*Dentry)
1123
		data, err = dentry.Marshal()
1124
		if err != nil {
1125
			return false
1126
		}
1127
		// set length
1128
		binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
1129
		if _, err = fp.Write(lenBuf); err != nil {
1130
			return false
1131
		}
1132
		if _, err = sign.Write(lenBuf); err != nil {
1133
			return false
1134
		}
1135
		if _, err = fp.Write(data); err != nil {
1136
			return false
1137
		}
1138
		if _, err = sign.Write(data); err != nil {
1139
			return false
1140
		}
1141
		return true
1142
	})
1143
	crc = sign.Sum32()
1144
	log.LogInfof("storeDentry: store complete: partitoinID(%v) volume(%v) numDentries(%v) crc(%v)",
1145
		mp.config.PartitionId, mp.config.VolName, sm.dentryTree.Len(), crc)
1146
	return
1147
}
1148

1149
func (mp *metaPartition) storeExtend(rootDir string, sm *storeMsg) (crc uint32, err error) {
1150
	extendTree := sm.extendTree
1151
	fp := path.Join(rootDir, extendFile)
1152
	var f *os.File
1153
	f, err = os.OpenFile(fp, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.O_CREATE, 0o755)
1154
	if err != nil {
1155
		return
1156
	}
1157
	log.LogDebugf("storeExtend: store start: partitoinID(%v) volume(%v) numInodes(%v) extends(%v)",
1158
		mp.config.PartitionId, mp.config.VolName, sm.inodeTree.Len(), sm.extendTree.Len())
1159
	defer func() {
1160
		closeErr := f.Close()
1161
		if err == nil && closeErr != nil {
1162
			err = closeErr
1163
		}
1164
	}()
1165
	writer := bufio.NewWriterSize(f, 4*1024*1024)
1166
	crc32 := crc32.NewIEEE()
1167
	varintTmp := make([]byte, binary.MaxVarintLen64)
1168
	var n int
1169
	// write number of extends
1170
	n = binary.PutUvarint(varintTmp, uint64(extendTree.Len()))
1171
	if _, err = writer.Write(varintTmp[:n]); err != nil {
1172
		return
1173
	}
1174
	if _, err = crc32.Write(varintTmp[:n]); err != nil {
1175
		return
1176
	}
1177

1178
	extendTree.Ascend(func(i BtreeItem) bool {
1179
		e := i.(*Extend)
1180
		var raw []byte
1181
		if sm.quotaRebuild {
1182
			mp.statisticExtendByStore(e, sm.inodeTree)
1183
		}
1184
		if raw, err = e.Bytes(); err != nil {
1185
			return false
1186
		}
1187
		// write length
1188
		n = binary.PutUvarint(varintTmp, uint64(len(raw)))
1189
		if _, err = writer.Write(varintTmp[:n]); err != nil {
1190
			return false
1191
		}
1192
		if _, err = crc32.Write(varintTmp[:n]); err != nil {
1193
			return false
1194
		}
1195
		// write raw
1196
		if _, err = writer.Write(raw); err != nil {
1197
			return false
1198
		}
1199
		if _, err = crc32.Write(raw); err != nil {
1200
			return false
1201
		}
1202
		return true
1203
	})
1204
	log.LogInfof("storeExtend: write data ok: partitoinID(%v) volume(%v) numInodes(%v) extends(%v) quotaRebuild(%v)",
1205
		mp.config.PartitionId, mp.config.VolName, sm.inodeTree.Len(), sm.extendTree.Len(), sm.quotaRebuild)
1206
	mp.mqMgr.statisticRebuildFin(sm.quotaRebuild)
1207
	if err != nil {
1208
		return
1209
	}
1210

1211
	if err = writer.Flush(); err != nil {
1212
		return
1213
	}
1214
	if err = f.Sync(); err != nil {
1215
		return
1216
	}
1217
	crc = crc32.Sum32()
1218
	log.LogInfof("storeExtend: store complete: partitoinID(%v) volume(%v) numExtends(%v) crc(%v)",
1219
		mp.config.PartitionId, mp.config.VolName, extendTree.Len(), crc)
1220
	return
1221
}
1222

1223
func (mp *metaPartition) storeMultipart(rootDir string, sm *storeMsg) (crc uint32, err error) {
1224
	multipartTree := sm.multipartTree
1225
	fp := path.Join(rootDir, multipartFile)
1226
	var f *os.File
1227
	f, err = os.OpenFile(fp, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.O_CREATE, 0o755)
1228
	if err != nil {
1229
		return
1230
	}
1231
	defer func() {
1232
		closeErr := f.Close()
1233
		if err == nil && closeErr != nil {
1234
			err = closeErr
1235
		}
1236
	}()
1237
	writer := bufio.NewWriterSize(f, 4*1024*1024)
1238
	crc32 := crc32.NewIEEE()
1239
	varintTmp := make([]byte, binary.MaxVarintLen64)
1240
	var n int
1241
	// write number of extends
1242
	n = binary.PutUvarint(varintTmp, uint64(multipartTree.Len()))
1243
	if _, err = writer.Write(varintTmp[:n]); err != nil {
1244
		return
1245
	}
1246
	if _, err = crc32.Write(varintTmp[:n]); err != nil {
1247
		return
1248
	}
1249
	multipartTree.Ascend(func(i BtreeItem) bool {
1250
		m := i.(*Multipart)
1251
		var raw []byte
1252
		if raw, err = m.Bytes(); err != nil {
1253
			return false
1254
		}
1255
		// write length
1256
		n = binary.PutUvarint(varintTmp, uint64(len(raw)))
1257
		if _, err = writer.Write(varintTmp[:n]); err != nil {
1258
			return false
1259
		}
1260
		if _, err = crc32.Write(varintTmp[:n]); err != nil {
1261
			return false
1262
		}
1263
		// write raw
1264
		if _, err = writer.Write(raw); err != nil {
1265
			return false
1266
		}
1267
		if _, err = crc32.Write(raw); err != nil {
1268
			return false
1269
		}
1270
		return true
1271
	})
1272
	if err != nil {
1273
		return
1274
	}
1275

1276
	if err = writer.Flush(); err != nil {
1277
		return
1278
	}
1279
	if err = f.Sync(); err != nil {
1280
		return
1281
	}
1282
	crc = crc32.Sum32()
1283
	log.LogInfof("storeMultipart: store complete: partitoinID(%v) volume(%v) numMultiparts(%v) crc(%v)",
1284
		mp.config.PartitionId, mp.config.VolName, multipartTree.Len(), crc)
1285
	return
1286
}
1287

1288
func (mp *metaPartition) storeUniqID(rootDir string, sm *storeMsg) (err error) {
1289
	filename := path.Join(rootDir, uniqIDFile)
1290
	fp, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_TRUNC|os.
1291
		O_CREATE, 0o755)
1292
	if err != nil {
1293
		return
1294
	}
1295
	defer func() {
1296
		err = fp.Sync()
1297
		fp.Close()
1298
	}()
1299
	if _, err = fp.WriteString(fmt.Sprintf("%d", sm.uniqId)); err != nil {
1300
		return
1301
	}
1302
	log.LogInfof("storeUniqID: store complete: partitionID(%v) volume(%v) uniqID(%v)",
1303
		mp.config.PartitionId, mp.config.VolName, sm.uniqId)
1304
	return
1305
}
1306

1307
func (mp *metaPartition) storeUniqChecker(rootDir string, sm *storeMsg) (crc uint32, err error) {
1308
	filename := path.Join(rootDir, uniqCheckerFile)
1309
	fp, err := os.OpenFile(filename, os.O_RDWR|os.O_TRUNC|os.O_APPEND|os.
1310
		O_CREATE, 0o755)
1311
	if err != nil {
1312
		return
1313
	}
1314
	defer func() {
1315
		err = fp.Sync()
1316
		fp.Close()
1317
	}()
1318

1319
	var data []byte
1320
	if data, crc, err = sm.uniqChecker.Marshal(); err != nil {
1321
		return
1322
	}
1323

1324
	if _, err = fp.Write(data); err != nil {
1325
		return
1326
	}
1327

1328
	log.LogInfof("storeUniqChecker: store complete: PartitionID(%v) volume(%v) crc(%v)",
1329
		mp.config.UniqId, mp.config.VolName, crc)
1330
	return
1331
}
1332

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.