1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
25
"github.com/cubefs/cubefs/proto"
26
"github.com/cubefs/cubefs/storage"
27
"github.com/cubefs/cubefs/util"
28
"github.com/cubefs/cubefs/util/log"
29
"github.com/cubefs/cubefs/util/timeutil"
32
type InodeResponse struct {
37
func NewInodeResponse() *InodeResponse {
38
return &InodeResponse{}
41
// Create and inode and attach it to the inode tree.
42
func (mp *metaPartition) fsmTxCreateInode(txIno *TxInode, quotaIds []uint32) (status uint8) {
44
if mp.txProcessor.txManager.txInRMDone(txIno.TxInfo.TxID) {
45
log.LogWarnf("fsmTxCreateInode: tx is already finish. txId %s", txIno.TxInfo.TxID)
46
return proto.OpTxInfoNotExistErr
49
// inodeInfo := mp.txProcessor.txManager.getTxInodeInfo(txIno.TxInfo.TxID, txIno.Inode.Inode)
50
inodeInfo, ok := txIno.TxInfo.TxInodeInfos[txIno.Inode.Inode]
52
status = proto.OpTxInodeInfoNotExistErr
56
rbInode := NewTxRollbackInode(txIno.Inode, quotaIds, inodeInfo, TxDelete)
57
status = mp.txProcessor.txResource.addTxRollbackInode(rbInode)
58
if status != proto.OpOk {
63
if status != proto.OpOk {
64
mp.txProcessor.txResource.deleteTxRollbackInode(txIno.Inode.Inode, txIno.TxInfo.TxID)
67
// 3.insert inode in inode tree
68
return mp.fsmCreateInode(txIno.Inode)
71
// Create and inode and attach it to the inode tree.
72
func (mp *metaPartition) fsmCreateInode(ino *Inode) (status uint8) {
73
if status = mp.uidManager.addUidSpace(ino.Uid, ino.Inode, nil); status != proto.OpOk {
78
if _, ok := mp.inodeTree.ReplaceOrInsert(ino, false); !ok {
79
status = proto.OpExistErr
85
func (mp *metaPartition) fsmTxCreateLinkInode(txIno *TxInode) (resp *InodeResponse) {
86
resp = NewInodeResponse()
87
resp.Status = proto.OpOk
88
if mp.txProcessor.txManager.txInRMDone(txIno.TxInfo.TxID) {
89
log.LogWarnf("fsmTxCreateLinkInode: tx is already finish. txId %s", txIno.TxInfo.TxID)
90
resp.Status = proto.OpTxInfoNotExistErr
94
// 2.register rollback item
95
inodeInfo, ok := txIno.TxInfo.TxInodeInfos[txIno.Inode.Inode]
97
resp.Status = proto.OpTxInodeInfoNotExistErr
101
rbInode := NewTxRollbackInode(txIno.Inode, []uint32{}, inodeInfo, TxDelete)
102
resp.Status = mp.txProcessor.txResource.addTxRollbackInode(rbInode)
103
if resp.Status == proto.OpExistErr {
104
resp.Status = proto.OpOk
105
resp.Msg = txIno.Inode
109
if resp.Status != proto.OpOk {
114
if resp.Status != proto.OpOk {
115
mp.txProcessor.txResource.deleteTxRollbackInode(txIno.Inode.Inode, txIno.TxInfo.TxID)
119
return mp.fsmCreateLinkInode(txIno.Inode, 0)
122
func (mp *metaPartition) fsmCreateLinkInode(ino *Inode, uniqID uint64) (resp *InodeResponse) {
123
resp = NewInodeResponse()
124
resp.Status = proto.OpOk
125
item := mp.inodeTree.CopyGet(ino)
127
resp.Status = proto.OpNotExistErr
131
if i.ShouldDelete() {
132
resp.Status = proto.OpNotExistErr
137
if !mp.uniqChecker.legalIn(uniqID) {
138
log.LogWarnf("fsmCreateLinkInode repeated, ino[%v] uniqID %v nlink %v", ino.Inode, uniqID, ino.GetNLink())
141
i.IncNLink(ino.getVer())
145
func (mp *metaPartition) getInodeByVer(ino *Inode) (i *Inode) {
146
item := mp.inodeTree.Get(ino)
148
log.LogDebugf("action[getInodeByVer] not found ino[%v] verseq [%v]", ino.Inode, ino.getVer())
151
i, _ = item.(*Inode).getInoByVer(ino.getVer(), false)
155
func (mp *metaPartition) getInodeTopLayer(ino *Inode) (resp *InodeResponse) {
156
resp = NewInodeResponse()
157
resp.Status = proto.OpOk
159
item := mp.inodeTree.Get(ino)
161
resp.Status = proto.OpNotExistErr
162
log.LogDebugf("action[getInodeTopLayer] not found ino[%v] verseq [%v]", ino.Inode, ino.getVer())
166
ctime := timeutil.GetCurrentTimeUnix()
168
* FIXME: not protected by lock yet, since nothing is depending on atime.
169
* Shall add inode lock in the future.
171
if ctime > i.AccessTime {
179
func (mp *metaPartition) getInode(ino *Inode, listAll bool) (resp *InodeResponse) {
180
resp = NewInodeResponse()
181
resp.Status = proto.OpOk
183
i := mp.getInodeByVer(ino)
184
if i == nil || (listAll == false && i.ShouldDelete()) {
185
log.LogDebugf("action[getInode] ino %v not found", ino)
186
resp.Status = proto.OpNotExistErr
190
ctime := timeutil.GetCurrentTimeUnix()
193
* FIXME: not protected by lock yet, since nothing is depending on atime.
194
* Shall add inode lock in the future.
196
if ctime > i.AccessTime {
204
func (mp *metaPartition) hasInode(ino *Inode) (ok bool) {
205
item := mp.inodeTree.Get(ino)
209
i := mp.getInodeByVer(ino)
210
if i == nil || i.ShouldDelete() {
217
// Ascend is the wrapper of inodeTree.Ascend
218
func (mp *metaPartition) Ascend(f func(i BtreeItem) bool) {
219
mp.inodeTree.Ascend(f)
222
func (mp *metaPartition) fsmTxUnlinkInode(txIno *TxInode) (resp *InodeResponse) {
223
resp = NewInodeResponse()
224
resp.Status = proto.OpOk
226
if proto.IsDir(txIno.Inode.Type) && txIno.TxInfo.TxType == proto.TxTypeRemove && txIno.Inode.NLink > 2 {
227
resp.Status = proto.OpNotEmpty
228
log.LogWarnf("fsmTxUnlinkInode: dir is not empty, can't remove it, txinode[%v]", txIno)
232
if mp.txProcessor.txManager.txInRMDone(txIno.TxInfo.TxID) {
233
log.LogWarnf("fsmTxUnlinkInode: tx is already finish. txId %s", txIno.TxInfo.TxID)
234
resp.Status = proto.OpTxInfoNotExistErr
238
inodeInfo, ok := txIno.TxInfo.TxInodeInfos[txIno.Inode.Inode]
240
resp.Status = proto.OpTxInodeInfoNotExistErr
243
var quotaIds []uint32
244
quotaIds, _ = mp.isExistQuota(txIno.Inode.Inode)
246
rbInode := NewTxRollbackInode(txIno.Inode, quotaIds, inodeInfo, TxAdd)
247
resp.Status = mp.txProcessor.txResource.addTxRollbackInode(rbInode)
248
if resp.Status == proto.OpExistErr {
249
resp.Status = proto.OpOk
250
item := mp.inodeTree.Get(txIno.Inode)
252
resp.Msg = item.(*Inode)
256
if resp.Status != proto.OpOk {
261
if resp.Status != proto.OpOk {
262
mp.txProcessor.txResource.deleteTxRollbackInode(txIno.Inode.Inode, txIno.TxInfo.TxID)
266
resp = mp.fsmUnlinkInode(txIno.Inode, 0)
267
if resp.Status != proto.OpOk {
271
if txIno.TxInfo.TxType == proto.TxTypeRename {
272
mp.fsmEvictInode(txIno.Inode)
278
// normal unlink seq is 0
279
// snapshot unlink seq is snapshotVersion
280
// fsmUnlinkInode delete the specified inode from inode tree.
282
func (mp *metaPartition) fsmUnlinkInode(ino *Inode, uniqID uint64) (resp *InodeResponse) {
283
log.LogDebugf("action[fsmUnlinkInode] mp[%v] ino[%v]", mp.config.PartitionId, ino)
284
var ext2Del []proto.ExtentKey
286
resp = NewInodeResponse()
287
resp.Status = proto.OpOk
289
item := mp.inodeTree.CopyGet(ino)
291
log.LogDebugf("action[fsmUnlinkInode] mp[%v] ino[%v]", mp.config.PartitionId, ino)
292
resp.Status = proto.OpNotExistErr
295
inode := item.(*Inode)
296
if ino.getVer() == 0 && inode.ShouldDelete() {
297
log.LogDebugf("action[fsmUnlinkInode] mp[%v] ino[%v]", mp.config.PartitionId, ino)
298
resp.Status = proto.OpNotExistErr
303
if !mp.uniqChecker.legalIn(uniqID) {
304
log.LogWarnf("fsmUnlinkInode repeat, mp[%v] ino[%v] uniqID %v nlink %v", mp.config.PartitionId, ino.Inode, uniqID, ino.GetNLink())
308
log.LogDebugf("action[fsmUnlinkInode] mp[%v] get inode[%v]", mp.config.PartitionId, inode)
314
if ino.getVer() == 0 {
315
ext2Del, doMore, status = inode.unlinkTopLayer(mp.config.PartitionId, ino, mp.verSeq, mp.multiVersionList)
316
} else { // means drop snapshot
317
log.LogDebugf("action[fsmUnlinkInode] mp[%v] req drop assigned snapshot reqseq [%v] inode seq [%v]", mp.config.PartitionId, ino.getVer(), inode.getVer())
318
if ino.getVer() > inode.getVer() && !isInitSnapVer(ino.getVer()) {
319
log.LogDebugf("action[fsmUnlinkInode] mp[%v] inode[%v] unlink not exist snapshot and return do nothing.reqseq [%v] larger than inode seq [%v]",
320
mp.config.PartitionId, ino.Inode, ino.getVer(), inode.getVer())
323
ext2Del, doMore, status = inode.unlinkVerInList(mp.config.PartitionId, ino, mp.verSeq, mp.multiVersionList)
331
if inode.IsEmptyDirAndNoSnapshot() {
332
if ino.NLink < 2 { // snapshot deletion
333
log.LogDebugf("action[fsmUnlinkInode] mp[%v] ino[%v] really be deleted, empty dir", mp.config.PartitionId, inode)
334
mp.inodeTree.Delete(inode)
335
mp.updateUsedInfo(0, -1, inode.Inode)
337
} else if inode.IsTempFile() {
338
// all snapshot between create to last deletion cleaned
339
if inode.NLink == 0 && inode.getLayerLen() == 0 {
340
mp.updateUsedInfo(-1*int64(inode.Size), -1, inode.Inode)
341
log.LogDebugf("action[fsmUnlinkInode] mp[%v] unlink inode[%v] and push to freeList", mp.config.PartitionId, inode)
342
inode.AccessTime = time.Now().Unix()
343
mp.freeList.Push(inode.Inode)
344
mp.uidManager.doMinusUidSpace(inode.Uid, inode.Inode, inode.Size)
345
log.LogDebugf("action[fsmUnlinkInode] mp[%v] ino[%v]", mp.config.PartitionId, inode)
349
if len(ext2Del) > 0 {
350
log.LogDebugf("action[fsmUnlinkInode] mp[%v] ino[%v] DecSplitExts ext2Del %v", mp.config.PartitionId, ino, ext2Del)
351
inode.DecSplitExts(mp.config.PartitionId, ext2Del)
352
mp.extDelCh <- ext2Del
354
log.LogDebugf("action[fsmUnlinkInode] mp[%v] ino[%v] left", mp.config.PartitionId, inode)
358
// fsmUnlinkInode delete the specified inode from inode tree.
359
func (mp *metaPartition) fsmUnlinkInodeBatch(ib InodeBatch) (resp []*InodeResponse) {
360
for _, ino := range ib {
361
status := mp.inodeInTx(ino.Inode)
362
if status != proto.OpOk {
363
resp = append(resp, &InodeResponse{Status: status})
366
resp = append(resp, mp.fsmUnlinkInode(ino, 0))
371
func (mp *metaPartition) internalHasInode(ino *Inode) bool {
372
return mp.inodeTree.Has(ino)
375
func (mp *metaPartition) internalDelete(val []byte) (err error) {
379
buf := bytes.NewBuffer(val)
380
ino := NewInode(0, 0)
382
err = binary.Read(buf, binary.BigEndian, &ino.Inode)
390
log.LogDebugf("internalDelete: received internal delete: partitionID(%v) inode[%v]",
391
mp.config.PartitionId, ino.Inode)
392
mp.internalDeleteInode(ino)
396
func (mp *metaPartition) internalDeleteBatch(val []byte) error {
400
inodes, err := InodeBatchUnmarshal(val)
405
for _, ino := range inodes {
406
log.LogDebugf("internalDelete: received internal delete: partitionID(%v) inode[%v]",
407
mp.config.PartitionId, ino.Inode)
408
mp.internalDeleteInode(ino)
414
func (mp *metaPartition) internalDeleteInode(ino *Inode) {
415
log.LogDebugf("action[internalDeleteInode] ino[%v] really be deleted", ino)
416
mp.inodeTree.Delete(ino)
417
mp.freeList.Remove(ino.Inode)
418
mp.extendTree.Delete(&Extend{inode: ino.Inode}) // Also delete extend attribute.
422
func (mp *metaPartition) fsmAppendExtents(ino *Inode) (status uint8) {
424
item := mp.inodeTree.CopyGet(ino)
426
status = proto.OpNotExistErr
429
ino2 := item.(*Inode)
430
if ino2.ShouldDelete() {
431
status = proto.OpNotExistErr
434
oldSize := int64(ino2.Size)
435
eks := ino.Extents.CopyExtents()
436
if status = mp.uidManager.addUidSpace(ino2.Uid, ino2.Inode, eks); status != proto.OpOk {
439
delExtents := ino2.AppendExtents(eks, ino.ModifyTime, mp.volType)
440
mp.updateUsedInfo(int64(ino2.Size)-oldSize, 0, ino2.Inode)
441
log.LogInfof("fsmAppendExtents mpId[%v].inode[%v] deleteExtents(%v)", mp.config.PartitionId, ino2.Inode, delExtents)
442
mp.uidManager.minusUidSpace(ino2.Uid, ino2.Inode, delExtents)
444
log.LogInfof("fsmAppendExtents mpId[%v].inode[%v] DecSplitExts deleteExtents(%v)", mp.config.PartitionId, ino2.Inode, delExtents)
445
ino2.DecSplitExts(mp.config.PartitionId, delExtents)
446
mp.extDelCh <- delExtents
450
func (mp *metaPartition) fsmAppendExtentsWithCheck(ino *Inode, isSplit bool) (status uint8) {
452
delExtents []proto.ExtentKey
453
discardExtentKey []proto.ExtentKey
456
if mp.verSeq < ino.getVer() {
457
status = proto.OpArgMismatchErr
458
log.LogErrorf("fsmAppendExtentsWithCheck.mp[%v] param ino[%v] mp seq [%v]", mp.config.PartitionId, ino, mp.verSeq)
462
item := mp.inodeTree.CopyGet(ino)
465
status = proto.OpNotExistErr
469
fsmIno := item.(*Inode)
470
if fsmIno.ShouldDelete() {
471
status = proto.OpNotExistErr
475
oldSize := int64(fsmIno.Size)
476
eks := ino.Extents.CopyExtents()
482
discardExtentKey = eks[1:]
485
if status = mp.uidManager.addUidSpace(fsmIno.Uid, fsmIno.Inode, eks[:1]); status != proto.OpOk {
486
log.LogErrorf("fsmAppendExtentsWithCheck.mp[%v] addUidSpace status [%v]", mp.config.PartitionId, status)
490
log.LogDebugf("action[fsmAppendExtentsWithCheck] mp[%v] ver [%v] ino[%v] isSplit %v ek [%v] hist len %v discardExtentKey %v",
491
mp.config.PartitionId, mp.verSeq, fsmIno.Inode, isSplit, eks[0], fsmIno.getLayerLen(), discardExtentKey)
493
appendExtParam := &AppendExtParam{
494
mpId: mp.config.PartitionId,
498
discardExtents: discardExtentKey,
500
multiVersionList: mp.multiVersionList,
504
delExtents, status = fsmIno.AppendExtentWithCheck(appendExtParam)
505
if status == proto.OpOk {
506
log.LogInfof("action[fsmAppendExtentsWithCheck] mp[%v] DecSplitExts delExtents [%v]", mp.config.PartitionId, delExtents)
507
fsmIno.DecSplitExts(appendExtParam.mpId, delExtents)
508
mp.extDelCh <- delExtents
510
// conflict need delete eks[0], to clear garbage data
511
if status == proto.OpConflictExtentsErr {
512
log.LogInfof("action[fsmAppendExtentsWithCheck] mp[%v] OpConflictExtentsErr [%v]", mp.config.PartitionId, eks[:1])
513
if !storage.IsTinyExtent(eks[0].ExtentId) && eks[0].ExtentOffset >= util.ExtentSize {
514
eks[0].SetSplit(true)
516
mp.extDelCh <- eks[:1]
519
// only the ek itself will be moved to level before
520
// ino verseq be set with mp ver before submit in case other mp be updated while on flight, which will lead to
521
// inconsistent between raft pairs
522
delExtents, status = fsmIno.SplitExtentWithCheck(appendExtParam)
523
log.LogInfof("action[fsmAppendExtentsWithCheck] mp[%v] DecSplitExts delExtents [%v]", mp.config.PartitionId, delExtents)
524
fsmIno.DecSplitExts(mp.config.PartitionId, delExtents)
525
mp.extDelCh <- delExtents
526
mp.uidManager.minusUidSpace(fsmIno.Uid, fsmIno.Inode, delExtents)
529
// conflict need delete eks[0], to clear garbage data
530
if status == proto.OpConflictExtentsErr {
531
mp.extDelCh <- eks[:1]
532
mp.uidManager.minusUidSpace(fsmIno.Uid, fsmIno.Inode, eks[:1])
533
log.LogDebugf("fsmAppendExtentsWithCheck mp[%v] delExtents inode[%v] ek(%v)", mp.config.PartitionId, fsmIno.Inode, delExtents)
536
mp.updateUsedInfo(int64(fsmIno.Size)-oldSize, 0, fsmIno.Inode)
537
log.LogInfof("fsmAppendExtentWithCheck mp[%v] inode[%v] ek(%v) deleteExtents(%v) discardExtents(%v) status(%v)",
538
mp.config.PartitionId, fsmIno.Inode, eks[0], delExtents, discardExtentKey, status)
543
func (mp *metaPartition) fsmAppendObjExtents(ino *Inode) (status uint8) {
545
item := mp.inodeTree.CopyGet(ino)
547
status = proto.OpNotExistErr
551
inode := item.(*Inode)
552
if inode.ShouldDelete() {
553
status = proto.OpNotExistErr
557
eks := ino.ObjExtents.CopyExtents()
558
err := inode.AppendObjExtents(eks, ino.ModifyTime)
559
// if err is not nil, means obj eks exist overlap.
561
log.LogErrorf("fsmAppendExtents inode[%v] err(%v)", inode.Inode, err)
562
status = proto.OpConflictExtentsErr
567
func (mp *metaPartition) fsmExtentsTruncate(ino *Inode) (resp *InodeResponse) {
569
resp = NewInodeResponse()
570
log.LogDebugf("fsmExtentsTruncate. req ino[%v]", ino)
571
resp.Status = proto.OpOk
572
item := mp.inodeTree.Get(ino)
574
resp.Status = proto.OpNotExistErr
578
if i.ShouldDelete() {
579
resp.Status = proto.OpNotExistErr
582
if proto.IsDir(i.Type) {
583
resp.Status = proto.OpArgMismatchErr
587
doOnLastKey := func(lastKey *proto.ExtentKey) {
588
var eks []proto.ExtentKey
589
eks = append(eks, *lastKey)
590
mp.uidManager.minusUidSpace(i.Uid, i.Inode, eks)
593
insertSplitKey := func(ek *proto.ExtentKey) {
594
i.insertEkRefMap(mp.config.PartitionId, ek)
597
if i.getVer() != mp.verSeq {
598
i.CreateVer(mp.verSeq)
603
if err = i.CreateLowerVersion(i.getVer(), mp.multiVersionList); err != nil {
606
oldSize := int64(i.Size)
607
delExtents := i.ExtentsTruncate(ino.Size, ino.ModifyTime, doOnLastKey, insertSplitKey)
609
if len(delExtents) == 0 {
613
if delExtents, err = i.RestoreExts2NextLayer(mp.config.PartitionId, delExtents, mp.verSeq, 0); err != nil {
614
panic("RestoreExts2NextLayer should not be error")
616
mp.updateUsedInfo(int64(i.Size)-oldSize, 0, i.Inode)
618
// now we should delete the extent
619
log.LogInfof("fsmExtentsTruncate.mp (%v) inode[%v] DecSplitExts exts(%v)", mp.config.PartitionId, i.Inode, delExtents)
620
i.DecSplitExts(mp.config.PartitionId, delExtents)
621
mp.extDelCh <- delExtents
622
mp.uidManager.minusUidSpace(i.Uid, i.Inode, delExtents)
626
func (mp *metaPartition) fsmEvictInode(ino *Inode) (resp *InodeResponse) {
627
resp = NewInodeResponse()
628
log.LogDebugf("action[fsmEvictInode] inode[%v]", ino)
629
resp.Status = proto.OpOk
630
item := mp.inodeTree.CopyGet(ino)
632
resp.Status = proto.OpNotExistErr
636
if i.ShouldDelete() {
637
log.LogDebugf("action[fsmEvictInode] inode[%v] already be mark delete", ino)
640
if proto.IsDir(i.Type) {
641
if i.IsEmptyDirAndNoSnapshot() {
648
log.LogDebugf("action[fsmEvictInode] inode[%v] already linke zero and be set mark delete and be put to freelist", ino)
649
if i.isEmptyVerList() {
651
mp.freeList.Push(i.Inode)
657
func (mp *metaPartition) fsmBatchEvictInode(ib InodeBatch) (resp []*InodeResponse) {
658
for _, ino := range ib {
659
status := mp.inodeInTx(ino.Inode)
660
if status != proto.OpOk {
661
resp = append(resp, &InodeResponse{Status: status})
664
resp = append(resp, mp.fsmEvictInode(ino))
669
func (mp *metaPartition) checkAndInsertFreeList(ino *Inode) {
670
if proto.IsDir(ino.Type) {
673
if ino.ShouldDelete() {
674
mp.freeList.Push(ino.Inode)
675
} else if ino.IsTempFile() {
676
ino.AccessTime = time.Now().Unix()
677
mp.freeList.Push(ino.Inode)
681
func (mp *metaPartition) fsmSetAttr(req *SetattrRequest) (err error) {
682
log.LogDebugf("action[fsmSetAttr] req %v", req)
683
ino := NewInode(req.Inode, req.Mode)
684
item := mp.inodeTree.CopyGet(ino)
689
if ino.ShouldDelete() {
696
// fsmExtentsEmpty only use in datalake situation
697
func (mp *metaPartition) fsmExtentsEmpty(ino *Inode) (status uint8) {
699
item := mp.inodeTree.CopyGet(ino)
701
status = proto.OpNotExistErr
705
if i.ShouldDelete() {
706
status = proto.OpNotExistErr
709
if proto.IsDir(i.Type) {
710
status = proto.OpArgMismatchErr
713
log.LogDebugf("action[fsmExtentsEmpty] mp[%v] ino[%v],eks len [%v]", mp.config.PartitionId, ino.Inode, len(i.Extents.eks))
714
tinyEks := i.CopyTinyExtents()
715
log.LogDebugf("action[fsmExtentsEmpty] mp[%v] ino[%v],eks tiny len [%v]", mp.config.PartitionId, ino.Inode, len(tinyEks))
717
if len(tinyEks) > 0 {
718
mp.extDelCh <- tinyEks
719
mp.uidManager.minusUidSpace(i.Uid, i.Inode, tinyEks)
720
log.LogDebugf("fsmExtentsEmpty mp[%v] inode[%d] tinyEks(%v)", mp.config.PartitionId, ino.Inode, tinyEks)
723
i.EmptyExtents(ino.ModifyTime)
728
// fsmExtentsEmpty only use in datalake situation
729
func (mp *metaPartition) fsmDelVerExtents(ino *Inode) (status uint8) {
731
item := mp.inodeTree.CopyGet(ino)
733
status = proto.OpNotExistErr
737
if i.ShouldDelete() {
738
status = proto.OpNotExistErr
741
if proto.IsDir(i.Type) {
742
status = proto.OpArgMismatchErr
745
log.LogDebugf("action[fsmExtentsEmpty] mp[%v] ino[%v],eks len [%v]", mp.config.PartitionId, ino.Inode, len(i.Extents.eks))
746
tinyEks := i.CopyTinyExtents()
747
log.LogDebugf("action[fsmExtentsEmpty] mp[%v] ino[%v],eks tiny len [%v]", mp.config.PartitionId, ino.Inode, len(tinyEks))
749
if len(tinyEks) > 0 {
750
mp.extDelCh <- tinyEks
751
log.LogDebugf("fsmExtentsEmpty mp[%v] inode[%d] tinyEks(%v)", mp.config.PartitionId, ino.Inode, tinyEks)
754
i.EmptyExtents(ino.ModifyTime)
759
func (mp *metaPartition) fsmClearInodeCache(ino *Inode) (status uint8) {
761
item := mp.inodeTree.Get(ino)
763
status = proto.OpNotExistErr
766
ino2 := item.(*Inode)
767
if ino2.ShouldDelete() {
768
status = proto.OpNotExistErr
771
delExtents := ino2.EmptyExtents(ino.ModifyTime)
772
log.LogInfof("fsmClearInodeCache.mp[%v] inode[%v] DecSplitExts delExtents(%v)", mp.config.PartitionId, ino2.Inode, delExtents)
773
if len(delExtents) > 0 {
774
ino2.DecSplitExts(mp.config.PartitionId, delExtents)
775
mp.extDelCh <- delExtents
780
// attion: unmarshal error will disard extent
781
func (mp *metaPartition) fsmSendToChan(val []byte, v3 bool) (status uint8) {
782
sortExtents := NewSortedExtents()
783
// ek for del don't need version info
784
err, _ := sortExtents.UnmarshalBinary(val, v3)
786
panic(fmt.Errorf("[fsmDelExtents] unmarshal sortExtents error, mp[%v], err(%s)", mp.config.PartitionId, err.Error()))
789
log.LogInfof("fsmDelExtents mp[%v] delExtents(%v)", mp.config.PartitionId, len(sortExtents.eks))
790
mp.extDelCh <- sortExtents.eks
794
func (mp *metaPartition) fsmSetInodeQuotaBatch(req *proto.BatchSetMetaserverQuotaReuqest) (resp *proto.BatchSetMetaserverQuotaResponse) {
797
resp = &proto.BatchSetMetaserverQuotaResponse{}
798
resp.InodeRes = make(map[uint64]uint8, 0)
799
for _, ino := range req.Inodes {
803
extend := NewExtend(ino)
804
treeItem := mp.extendTree.Get(extend)
805
inode := NewInode(ino, 0)
806
retMsg := mp.getInode(inode, false)
808
if retMsg.Status != proto.OpOk {
809
log.LogErrorf("fsmSetInodeQuotaBatch get inode[%v] fail.", ino)
810
resp.InodeRes[ino] = retMsg.Status
814
log.LogDebugf("fsmSetInodeQuotaBatch msg [%v] inode[%v]", retMsg, inode)
815
quotaInfos := &proto.MetaQuotaInfos{
816
QuotaInfoMap: make(map[uint32]*proto.MetaQuotaInfo),
818
quotaInfo := &proto.MetaQuotaInfo{
819
RootInode: req.IsRoot,
823
quotaInfos.QuotaInfoMap[req.QuotaId] = quotaInfo
824
mp.extendTree.ReplaceOrInsert(extend, true)
826
extend = treeItem.(*Extend)
827
value, exist := extend.Get([]byte(proto.QuotaKey))
829
if err = json.Unmarshal(value, "aInfos.QuotaInfoMap); err != nil {
830
log.LogErrorf("set quota Unmarshal quotaInfos fail [%v]", err)
831
resp.InodeRes[ino] = proto.OpErr
834
oldQuotaInfo, ok := quotaInfos.QuotaInfoMap[req.QuotaId]
837
quotaInfo = oldQuotaInfo
840
quotaInfos.QuotaInfoMap[req.QuotaId] = quotaInfo
842
value, err := json.Marshal(quotaInfos.QuotaInfoMap)
844
log.LogErrorf("set quota marsha1 quotaInfos [%v] fail [%v]", quotaInfos, err)
845
resp.InodeRes[ino] = proto.OpErr
849
extend.Put([]byte(proto.QuotaKey), value, mp.verSeq)
850
resp.InodeRes[ino] = proto.OpOk
853
bytes += int64(inode.Size)
856
mp.mqMgr.updateUsedInfo(bytes, files, req.QuotaId)
857
log.LogInfof("fsmSetInodeQuotaBatch quotaId [%v] resp [%v] success.", req.QuotaId, resp)
861
func (mp *metaPartition) fsmDeleteInodeQuotaBatch(req *proto.BatchDeleteMetaserverQuotaReuqest) (resp *proto.BatchDeleteMetaserverQuotaResponse) {
864
resp = &proto.BatchDeleteMetaserverQuotaResponse{}
865
resp.InodeRes = make(map[uint64]uint8, 0)
867
for _, ino := range req.Inodes {
869
extend := NewExtend(ino)
870
treeItem := mp.extendTree.Get(extend)
871
inode := NewInode(ino, 0)
872
retMsg := mp.getInode(inode, false)
873
if retMsg.Status != proto.OpOk {
874
log.LogErrorf("fsmDeleteInodeQuotaBatch get inode[%v] fail.", ino)
875
resp.InodeRes[ino] = retMsg.Status
879
log.LogDebugf("fsmDeleteInodeQuotaBatch msg [%v] inode[%v]", retMsg, inode)
880
quotaInfos := &proto.MetaQuotaInfos{
881
QuotaInfoMap: make(map[uint32]*proto.MetaQuotaInfo),
885
log.LogDebugf("fsmDeleteInodeQuotaBatch inode[%v] not has extend ", ino)
886
resp.InodeRes[ino] = proto.OpOk
889
extend = treeItem.(*Extend)
890
value, exist := extend.Get([]byte(proto.QuotaKey))
892
if err = json.Unmarshal(value, "aInfos.QuotaInfoMap); err != nil {
893
log.LogErrorf("fsmDeleteInodeQuotaBatch ino[%v] Unmarshal quotaInfos fail [%v]", ino, err)
894
resp.InodeRes[ino] = proto.OpErr
898
_, ok := quotaInfos.QuotaInfoMap[req.QuotaId]
900
delete(quotaInfos.QuotaInfoMap, req.QuotaId)
901
if len(quotaInfos.QuotaInfoMap) == 0 {
902
extend.Remove([]byte(proto.QuotaKey))
904
value, err = json.Marshal(quotaInfos.QuotaInfoMap)
906
log.LogErrorf("fsmDeleteInodeQuotaBatch marsha1 quotaInfos [%v] fail [%v]", quotaInfos, err)
907
resp.InodeRes[ino] = proto.OpErr
910
extend.Put([]byte(proto.QuotaKey), value, mp.verSeq)
913
log.LogDebugf("fsmDeleteInodeQuotaBatch QuotaInfoMap can not find inode[%v] quota [%v]", ino, req.QuotaId)
914
resp.InodeRes[ino] = proto.OpOk
918
resp.InodeRes[ino] = proto.OpOk
923
bytes -= int64(inode.Size)
925
mp.mqMgr.updateUsedInfo(bytes, files, req.QuotaId)
926
log.LogInfof("fsmDeleteInodeQuotaBatch quotaId [%v] resp [%v] success.", req.QuotaId, resp)