cubefs

Форк
0
/
partition_fsmop_inode.go 
928 строк · 26.5 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package metanode
16

17
import (
18
	"bytes"
19
	"encoding/binary"
20
	"encoding/json"
21
	"fmt"
22
	"io"
23
	"time"
24

25
	"github.com/cubefs/cubefs/proto"
26
	"github.com/cubefs/cubefs/storage"
27
	"github.com/cubefs/cubefs/util"
28
	"github.com/cubefs/cubefs/util/log"
29
	"github.com/cubefs/cubefs/util/timeutil"
30
)
31

32
type InodeResponse struct {
33
	Status uint8
34
	Msg    *Inode
35
}
36

37
func NewInodeResponse() *InodeResponse {
38
	return &InodeResponse{}
39
}
40

41
// Create and inode and attach it to the inode tree.
42
func (mp *metaPartition) fsmTxCreateInode(txIno *TxInode, quotaIds []uint32) (status uint8) {
43
	status = proto.OpOk
44
	if mp.txProcessor.txManager.txInRMDone(txIno.TxInfo.TxID) {
45
		log.LogWarnf("fsmTxCreateInode: tx is already finish. txId %s", txIno.TxInfo.TxID)
46
		return proto.OpTxInfoNotExistErr
47
	}
48

49
	// inodeInfo := mp.txProcessor.txManager.getTxInodeInfo(txIno.TxInfo.TxID, txIno.Inode.Inode)
50
	inodeInfo, ok := txIno.TxInfo.TxInodeInfos[txIno.Inode.Inode]
51
	if !ok {
52
		status = proto.OpTxInodeInfoNotExistErr
53
		return
54
	}
55

56
	rbInode := NewTxRollbackInode(txIno.Inode, quotaIds, inodeInfo, TxDelete)
57
	status = mp.txProcessor.txResource.addTxRollbackInode(rbInode)
58
	if status != proto.OpOk {
59
		return
60
	}
61

62
	defer func() {
63
		if status != proto.OpOk {
64
			mp.txProcessor.txResource.deleteTxRollbackInode(txIno.Inode.Inode, txIno.TxInfo.TxID)
65
		}
66
	}()
67
	// 3.insert inode in inode tree
68
	return mp.fsmCreateInode(txIno.Inode)
69
}
70

71
// Create and inode and attach it to the inode tree.
72
func (mp *metaPartition) fsmCreateInode(ino *Inode) (status uint8) {
73
	if status = mp.uidManager.addUidSpace(ino.Uid, ino.Inode, nil); status != proto.OpOk {
74
		return
75
	}
76

77
	status = proto.OpOk
78
	if _, ok := mp.inodeTree.ReplaceOrInsert(ino, false); !ok {
79
		status = proto.OpExistErr
80
	}
81

82
	return
83
}
84

85
func (mp *metaPartition) fsmTxCreateLinkInode(txIno *TxInode) (resp *InodeResponse) {
86
	resp = NewInodeResponse()
87
	resp.Status = proto.OpOk
88
	if mp.txProcessor.txManager.txInRMDone(txIno.TxInfo.TxID) {
89
		log.LogWarnf("fsmTxCreateLinkInode: tx is already finish. txId %s", txIno.TxInfo.TxID)
90
		resp.Status = proto.OpTxInfoNotExistErr
91
		return
92
	}
93

94
	// 2.register rollback item
95
	inodeInfo, ok := txIno.TxInfo.TxInodeInfos[txIno.Inode.Inode]
96
	if !ok {
97
		resp.Status = proto.OpTxInodeInfoNotExistErr
98
		return
99
	}
100

101
	rbInode := NewTxRollbackInode(txIno.Inode, []uint32{}, inodeInfo, TxDelete)
102
	resp.Status = mp.txProcessor.txResource.addTxRollbackInode(rbInode)
103
	if resp.Status == proto.OpExistErr {
104
		resp.Status = proto.OpOk
105
		resp.Msg = txIno.Inode
106
		return
107
	}
108

109
	if resp.Status != proto.OpOk {
110
		return
111
	}
112

113
	defer func() {
114
		if resp.Status != proto.OpOk {
115
			mp.txProcessor.txResource.deleteTxRollbackInode(txIno.Inode.Inode, txIno.TxInfo.TxID)
116
		}
117
	}()
118

119
	return mp.fsmCreateLinkInode(txIno.Inode, 0)
120
}
121

122
func (mp *metaPartition) fsmCreateLinkInode(ino *Inode, uniqID uint64) (resp *InodeResponse) {
123
	resp = NewInodeResponse()
124
	resp.Status = proto.OpOk
125
	item := mp.inodeTree.CopyGet(ino)
126
	if item == nil {
127
		resp.Status = proto.OpNotExistErr
128
		return
129
	}
130
	i := item.(*Inode)
131
	if i.ShouldDelete() {
132
		resp.Status = proto.OpNotExistErr
133
		return
134
	}
135

136
	resp.Msg = i
137
	if !mp.uniqChecker.legalIn(uniqID) {
138
		log.LogWarnf("fsmCreateLinkInode repeated, ino[%v] uniqID %v nlink %v", ino.Inode, uniqID, ino.GetNLink())
139
		return
140
	}
141
	i.IncNLink(ino.getVer())
142
	return
143
}
144

145
func (mp *metaPartition) getInodeByVer(ino *Inode) (i *Inode) {
146
	item := mp.inodeTree.Get(ino)
147
	if item == nil {
148
		log.LogDebugf("action[getInodeByVer] not found ino[%v] verseq [%v]", ino.Inode, ino.getVer())
149
		return
150
	}
151
	i, _ = item.(*Inode).getInoByVer(ino.getVer(), false)
152
	return
153
}
154

155
func (mp *metaPartition) getInodeTopLayer(ino *Inode) (resp *InodeResponse) {
156
	resp = NewInodeResponse()
157
	resp.Status = proto.OpOk
158

159
	item := mp.inodeTree.Get(ino)
160
	if item == nil {
161
		resp.Status = proto.OpNotExistErr
162
		log.LogDebugf("action[getInodeTopLayer] not found ino[%v] verseq [%v]", ino.Inode, ino.getVer())
163
		return
164
	}
165
	i := item.(*Inode)
166
	ctime := timeutil.GetCurrentTimeUnix()
167
	/*
168
	 * FIXME: not protected by lock yet, since nothing is depending on atime.
169
	 * Shall add inode lock in the future.
170
	 */
171
	if ctime > i.AccessTime {
172
		i.AccessTime = ctime
173
	}
174

175
	resp.Msg = i
176
	return
177
}
178

179
func (mp *metaPartition) getInode(ino *Inode, listAll bool) (resp *InodeResponse) {
180
	resp = NewInodeResponse()
181
	resp.Status = proto.OpOk
182

183
	i := mp.getInodeByVer(ino)
184
	if i == nil || (listAll == false && i.ShouldDelete()) {
185
		log.LogDebugf("action[getInode] ino  %v not found", ino)
186
		resp.Status = proto.OpNotExistErr
187
		return
188
	}
189

190
	ctime := timeutil.GetCurrentTimeUnix()
191

192
	/*
193
	 * FIXME: not protected by lock yet, since nothing is depending on atime.
194
	 * Shall add inode lock in the future.
195
	 */
196
	if ctime > i.AccessTime {
197
		i.AccessTime = ctime
198
	}
199

200
	resp.Msg = i
201
	return
202
}
203

204
func (mp *metaPartition) hasInode(ino *Inode) (ok bool) {
205
	item := mp.inodeTree.Get(ino)
206
	if item == nil {
207
		return
208
	}
209
	i := mp.getInodeByVer(ino)
210
	if i == nil || i.ShouldDelete() {
211
		return
212
	}
213
	ok = true
214
	return
215
}
216

217
// Ascend is the wrapper of inodeTree.Ascend
218
func (mp *metaPartition) Ascend(f func(i BtreeItem) bool) {
219
	mp.inodeTree.Ascend(f)
220
}
221

222
func (mp *metaPartition) fsmTxUnlinkInode(txIno *TxInode) (resp *InodeResponse) {
223
	resp = NewInodeResponse()
224
	resp.Status = proto.OpOk
225

226
	if proto.IsDir(txIno.Inode.Type) && txIno.TxInfo.TxType == proto.TxTypeRemove && txIno.Inode.NLink > 2 {
227
		resp.Status = proto.OpNotEmpty
228
		log.LogWarnf("fsmTxUnlinkInode: dir is not empty, can't remove it, txinode[%v]", txIno)
229
		return
230
	}
231

232
	if mp.txProcessor.txManager.txInRMDone(txIno.TxInfo.TxID) {
233
		log.LogWarnf("fsmTxUnlinkInode: tx is already finish. txId %s", txIno.TxInfo.TxID)
234
		resp.Status = proto.OpTxInfoNotExistErr
235
		return
236
	}
237

238
	inodeInfo, ok := txIno.TxInfo.TxInodeInfos[txIno.Inode.Inode]
239
	if !ok {
240
		resp.Status = proto.OpTxInodeInfoNotExistErr
241
		return
242
	}
243
	var quotaIds []uint32
244
	quotaIds, _ = mp.isExistQuota(txIno.Inode.Inode)
245

246
	rbInode := NewTxRollbackInode(txIno.Inode, quotaIds, inodeInfo, TxAdd)
247
	resp.Status = mp.txProcessor.txResource.addTxRollbackInode(rbInode)
248
	if resp.Status == proto.OpExistErr {
249
		resp.Status = proto.OpOk
250
		item := mp.inodeTree.Get(txIno.Inode)
251
		if item != nil {
252
			resp.Msg = item.(*Inode)
253
		}
254
		return
255
	}
256
	if resp.Status != proto.OpOk {
257
		return
258
	}
259

260
	defer func() {
261
		if resp.Status != proto.OpOk {
262
			mp.txProcessor.txResource.deleteTxRollbackInode(txIno.Inode.Inode, txIno.TxInfo.TxID)
263
		}
264
	}()
265

266
	resp = mp.fsmUnlinkInode(txIno.Inode, 0)
267
	if resp.Status != proto.OpOk {
268
		return
269
	}
270

271
	if txIno.TxInfo.TxType == proto.TxTypeRename {
272
		mp.fsmEvictInode(txIno.Inode)
273
	}
274

275
	return
276
}
277

278
// normal unlink seq is 0
279
// snapshot unlink seq is snapshotVersion
280
// fsmUnlinkInode delete the specified inode from inode tree.
281

282
func (mp *metaPartition) fsmUnlinkInode(ino *Inode, uniqID uint64) (resp *InodeResponse) {
283
	log.LogDebugf("action[fsmUnlinkInode] mp[%v] ino[%v]", mp.config.PartitionId, ino)
284
	var ext2Del []proto.ExtentKey
285

286
	resp = NewInodeResponse()
287
	resp.Status = proto.OpOk
288

289
	item := mp.inodeTree.CopyGet(ino)
290
	if item == nil {
291
		log.LogDebugf("action[fsmUnlinkInode] mp[%v] ino[%v]", mp.config.PartitionId, ino)
292
		resp.Status = proto.OpNotExistErr
293
		return
294
	}
295
	inode := item.(*Inode)
296
	if ino.getVer() == 0 && inode.ShouldDelete() {
297
		log.LogDebugf("action[fsmUnlinkInode] mp[%v] ino[%v]", mp.config.PartitionId, ino)
298
		resp.Status = proto.OpNotExistErr
299
		return
300
	}
301

302
	resp.Msg = inode
303
	if !mp.uniqChecker.legalIn(uniqID) {
304
		log.LogWarnf("fsmUnlinkInode repeat, mp[%v] ino[%v] uniqID %v nlink %v", mp.config.PartitionId, ino.Inode, uniqID, ino.GetNLink())
305
		return
306
	}
307

308
	log.LogDebugf("action[fsmUnlinkInode] mp[%v] get inode[%v]", mp.config.PartitionId, inode)
309
	var (
310
		doMore bool
311
		status = proto.OpOk
312
	)
313

314
	if ino.getVer() == 0 {
315
		ext2Del, doMore, status = inode.unlinkTopLayer(mp.config.PartitionId, ino, mp.verSeq, mp.multiVersionList)
316
	} else { // means drop snapshot
317
		log.LogDebugf("action[fsmUnlinkInode] mp[%v] req drop assigned snapshot reqseq [%v] inode seq [%v]", mp.config.PartitionId, ino.getVer(), inode.getVer())
318
		if ino.getVer() > inode.getVer() && !isInitSnapVer(ino.getVer()) {
319
			log.LogDebugf("action[fsmUnlinkInode] mp[%v] inode[%v] unlink not exist snapshot and return do nothing.reqseq [%v] larger than inode seq [%v]",
320
				mp.config.PartitionId, ino.Inode, ino.getVer(), inode.getVer())
321
			return
322
		} else {
323
			ext2Del, doMore, status = inode.unlinkVerInList(mp.config.PartitionId, ino, mp.verSeq, mp.multiVersionList)
324
		}
325
	}
326
	if !doMore {
327
		resp.Status = status
328
		return
329
	}
330

331
	if inode.IsEmptyDirAndNoSnapshot() {
332
		if ino.NLink < 2 { // snapshot deletion
333
			log.LogDebugf("action[fsmUnlinkInode] mp[%v] ino[%v] really be deleted, empty dir", mp.config.PartitionId, inode)
334
			mp.inodeTree.Delete(inode)
335
			mp.updateUsedInfo(0, -1, inode.Inode)
336
		}
337
	} else if inode.IsTempFile() {
338
		// all snapshot between create to last deletion cleaned
339
		if inode.NLink == 0 && inode.getLayerLen() == 0 {
340
			mp.updateUsedInfo(-1*int64(inode.Size), -1, inode.Inode)
341
			log.LogDebugf("action[fsmUnlinkInode] mp[%v] unlink inode[%v] and push to freeList", mp.config.PartitionId, inode)
342
			inode.AccessTime = time.Now().Unix()
343
			mp.freeList.Push(inode.Inode)
344
			mp.uidManager.doMinusUidSpace(inode.Uid, inode.Inode, inode.Size)
345
			log.LogDebugf("action[fsmUnlinkInode] mp[%v] ino[%v]", mp.config.PartitionId, inode)
346
		}
347
	}
348

349
	if len(ext2Del) > 0 {
350
		log.LogDebugf("action[fsmUnlinkInode] mp[%v] ino[%v] DecSplitExts ext2Del %v", mp.config.PartitionId, ino, ext2Del)
351
		inode.DecSplitExts(mp.config.PartitionId, ext2Del)
352
		mp.extDelCh <- ext2Del
353
	}
354
	log.LogDebugf("action[fsmUnlinkInode] mp[%v] ino[%v] left", mp.config.PartitionId, inode)
355
	return
356
}
357

358
// fsmUnlinkInode delete the specified inode from inode tree.
359
func (mp *metaPartition) fsmUnlinkInodeBatch(ib InodeBatch) (resp []*InodeResponse) {
360
	for _, ino := range ib {
361
		status := mp.inodeInTx(ino.Inode)
362
		if status != proto.OpOk {
363
			resp = append(resp, &InodeResponse{Status: status})
364
			continue
365
		}
366
		resp = append(resp, mp.fsmUnlinkInode(ino, 0))
367
	}
368
	return
369
}
370

371
func (mp *metaPartition) internalHasInode(ino *Inode) bool {
372
	return mp.inodeTree.Has(ino)
373
}
374

375
func (mp *metaPartition) internalDelete(val []byte) (err error) {
376
	if len(val) == 0 {
377
		return
378
	}
379
	buf := bytes.NewBuffer(val)
380
	ino := NewInode(0, 0)
381
	for {
382
		err = binary.Read(buf, binary.BigEndian, &ino.Inode)
383
		if err != nil {
384
			if err == io.EOF {
385
				err = nil
386
				return
387
			}
388
			return
389
		}
390
		log.LogDebugf("internalDelete: received internal delete: partitionID(%v) inode[%v]",
391
			mp.config.PartitionId, ino.Inode)
392
		mp.internalDeleteInode(ino)
393
	}
394
}
395

396
func (mp *metaPartition) internalDeleteBatch(val []byte) error {
397
	if len(val) == 0 {
398
		return nil
399
	}
400
	inodes, err := InodeBatchUnmarshal(val)
401
	if err != nil {
402
		return nil
403
	}
404

405
	for _, ino := range inodes {
406
		log.LogDebugf("internalDelete: received internal delete: partitionID(%v) inode[%v]",
407
			mp.config.PartitionId, ino.Inode)
408
		mp.internalDeleteInode(ino)
409
	}
410

411
	return nil
412
}
413

414
func (mp *metaPartition) internalDeleteInode(ino *Inode) {
415
	log.LogDebugf("action[internalDeleteInode] ino[%v] really be deleted", ino)
416
	mp.inodeTree.Delete(ino)
417
	mp.freeList.Remove(ino.Inode)
418
	mp.extendTree.Delete(&Extend{inode: ino.Inode}) // Also delete extend attribute.
419
	return
420
}
421

422
func (mp *metaPartition) fsmAppendExtents(ino *Inode) (status uint8) {
423
	status = proto.OpOk
424
	item := mp.inodeTree.CopyGet(ino)
425
	if item == nil {
426
		status = proto.OpNotExistErr
427
		return
428
	}
429
	ino2 := item.(*Inode)
430
	if ino2.ShouldDelete() {
431
		status = proto.OpNotExistErr
432
		return
433
	}
434
	oldSize := int64(ino2.Size)
435
	eks := ino.Extents.CopyExtents()
436
	if status = mp.uidManager.addUidSpace(ino2.Uid, ino2.Inode, eks); status != proto.OpOk {
437
		return
438
	}
439
	delExtents := ino2.AppendExtents(eks, ino.ModifyTime, mp.volType)
440
	mp.updateUsedInfo(int64(ino2.Size)-oldSize, 0, ino2.Inode)
441
	log.LogInfof("fsmAppendExtents mpId[%v].inode[%v] deleteExtents(%v)", mp.config.PartitionId, ino2.Inode, delExtents)
442
	mp.uidManager.minusUidSpace(ino2.Uid, ino2.Inode, delExtents)
443

444
	log.LogInfof("fsmAppendExtents mpId[%v].inode[%v] DecSplitExts deleteExtents(%v)", mp.config.PartitionId, ino2.Inode, delExtents)
445
	ino2.DecSplitExts(mp.config.PartitionId, delExtents)
446
	mp.extDelCh <- delExtents
447
	return
448
}
449

450
func (mp *metaPartition) fsmAppendExtentsWithCheck(ino *Inode, isSplit bool) (status uint8) {
451
	var (
452
		delExtents       []proto.ExtentKey
453
		discardExtentKey []proto.ExtentKey
454
	)
455

456
	if mp.verSeq < ino.getVer() {
457
		status = proto.OpArgMismatchErr
458
		log.LogErrorf("fsmAppendExtentsWithCheck.mp[%v] param ino[%v] mp seq [%v]", mp.config.PartitionId, ino, mp.verSeq)
459
		return
460
	}
461
	status = proto.OpOk
462
	item := mp.inodeTree.CopyGet(ino)
463

464
	if item == nil {
465
		status = proto.OpNotExistErr
466
		return
467
	}
468

469
	fsmIno := item.(*Inode)
470
	if fsmIno.ShouldDelete() {
471
		status = proto.OpNotExistErr
472
		return
473
	}
474

475
	oldSize := int64(fsmIno.Size)
476
	eks := ino.Extents.CopyExtents()
477

478
	if len(eks) < 1 {
479
		return
480
	}
481
	if len(eks) > 1 {
482
		discardExtentKey = eks[1:]
483
	}
484

485
	if status = mp.uidManager.addUidSpace(fsmIno.Uid, fsmIno.Inode, eks[:1]); status != proto.OpOk {
486
		log.LogErrorf("fsmAppendExtentsWithCheck.mp[%v] addUidSpace status [%v]", mp.config.PartitionId, status)
487
		return
488
	}
489

490
	log.LogDebugf("action[fsmAppendExtentsWithCheck] mp[%v] ver [%v] ino[%v] isSplit %v ek [%v] hist len %v discardExtentKey %v",
491
		mp.config.PartitionId, mp.verSeq, fsmIno.Inode, isSplit, eks[0], fsmIno.getLayerLen(), discardExtentKey)
492

493
	appendExtParam := &AppendExtParam{
494
		mpId:             mp.config.PartitionId,
495
		mpVer:            mp.verSeq,
496
		ek:               eks[0],
497
		ct:               ino.ModifyTime,
498
		discardExtents:   discardExtentKey,
499
		volType:          mp.volType,
500
		multiVersionList: mp.multiVersionList,
501
	}
502

503
	if !isSplit {
504
		delExtents, status = fsmIno.AppendExtentWithCheck(appendExtParam)
505
		if status == proto.OpOk {
506
			log.LogInfof("action[fsmAppendExtentsWithCheck] mp[%v] DecSplitExts delExtents [%v]", mp.config.PartitionId, delExtents)
507
			fsmIno.DecSplitExts(appendExtParam.mpId, delExtents)
508
			mp.extDelCh <- delExtents
509
		}
510
		// conflict need delete eks[0], to clear garbage data
511
		if status == proto.OpConflictExtentsErr {
512
			log.LogInfof("action[fsmAppendExtentsWithCheck] mp[%v] OpConflictExtentsErr [%v]", mp.config.PartitionId, eks[:1])
513
			if !storage.IsTinyExtent(eks[0].ExtentId) && eks[0].ExtentOffset >= util.ExtentSize {
514
				eks[0].SetSplit(true)
515
			}
516
			mp.extDelCh <- eks[:1]
517
		}
518
	} else {
519
		// only the ek itself will be moved to level before
520
		// ino verseq be set with mp ver before submit in case other mp be updated while on flight, which will lead to
521
		// inconsistent between raft pairs
522
		delExtents, status = fsmIno.SplitExtentWithCheck(appendExtParam)
523
		log.LogInfof("action[fsmAppendExtentsWithCheck] mp[%v] DecSplitExts delExtents [%v]", mp.config.PartitionId, delExtents)
524
		fsmIno.DecSplitExts(mp.config.PartitionId, delExtents)
525
		mp.extDelCh <- delExtents
526
		mp.uidManager.minusUidSpace(fsmIno.Uid, fsmIno.Inode, delExtents)
527
	}
528

529
	// conflict need delete eks[0], to clear garbage data
530
	if status == proto.OpConflictExtentsErr {
531
		mp.extDelCh <- eks[:1]
532
		mp.uidManager.minusUidSpace(fsmIno.Uid, fsmIno.Inode, eks[:1])
533
		log.LogDebugf("fsmAppendExtentsWithCheck mp[%v] delExtents inode[%v] ek(%v)", mp.config.PartitionId, fsmIno.Inode, delExtents)
534
	}
535

536
	mp.updateUsedInfo(int64(fsmIno.Size)-oldSize, 0, fsmIno.Inode)
537
	log.LogInfof("fsmAppendExtentWithCheck mp[%v] inode[%v] ek(%v) deleteExtents(%v) discardExtents(%v) status(%v)",
538
		mp.config.PartitionId, fsmIno.Inode, eks[0], delExtents, discardExtentKey, status)
539

540
	return
541
}
542

543
func (mp *metaPartition) fsmAppendObjExtents(ino *Inode) (status uint8) {
544
	status = proto.OpOk
545
	item := mp.inodeTree.CopyGet(ino)
546
	if item == nil {
547
		status = proto.OpNotExistErr
548
		return
549
	}
550

551
	inode := item.(*Inode)
552
	if inode.ShouldDelete() {
553
		status = proto.OpNotExistErr
554
		return
555
	}
556

557
	eks := ino.ObjExtents.CopyExtents()
558
	err := inode.AppendObjExtents(eks, ino.ModifyTime)
559
	// if err is not nil, means obj eks exist overlap.
560
	if err != nil {
561
		log.LogErrorf("fsmAppendExtents inode[%v] err(%v)", inode.Inode, err)
562
		status = proto.OpConflictExtentsErr
563
	}
564
	return
565
}
566

567
func (mp *metaPartition) fsmExtentsTruncate(ino *Inode) (resp *InodeResponse) {
568
	var err error
569
	resp = NewInodeResponse()
570
	log.LogDebugf("fsmExtentsTruncate. req ino[%v]", ino)
571
	resp.Status = proto.OpOk
572
	item := mp.inodeTree.Get(ino)
573
	if item == nil {
574
		resp.Status = proto.OpNotExistErr
575
		return
576
	}
577
	i := item.(*Inode)
578
	if i.ShouldDelete() {
579
		resp.Status = proto.OpNotExistErr
580
		return
581
	}
582
	if proto.IsDir(i.Type) {
583
		resp.Status = proto.OpArgMismatchErr
584
		return
585
	}
586

587
	doOnLastKey := func(lastKey *proto.ExtentKey) {
588
		var eks []proto.ExtentKey
589
		eks = append(eks, *lastKey)
590
		mp.uidManager.minusUidSpace(i.Uid, i.Inode, eks)
591
	}
592

593
	insertSplitKey := func(ek *proto.ExtentKey) {
594
		i.insertEkRefMap(mp.config.PartitionId, ek)
595
	}
596

597
	if i.getVer() != mp.verSeq {
598
		i.CreateVer(mp.verSeq)
599
	}
600
	i.Lock()
601
	defer i.Unlock()
602

603
	if err = i.CreateLowerVersion(i.getVer(), mp.multiVersionList); err != nil {
604
		return
605
	}
606
	oldSize := int64(i.Size)
607
	delExtents := i.ExtentsTruncate(ino.Size, ino.ModifyTime, doOnLastKey, insertSplitKey)
608

609
	if len(delExtents) == 0 {
610
		return
611
	}
612

613
	if delExtents, err = i.RestoreExts2NextLayer(mp.config.PartitionId, delExtents, mp.verSeq, 0); err != nil {
614
		panic("RestoreExts2NextLayer should not be error")
615
	}
616
	mp.updateUsedInfo(int64(i.Size)-oldSize, 0, i.Inode)
617

618
	// now we should delete the extent
619
	log.LogInfof("fsmExtentsTruncate.mp (%v) inode[%v] DecSplitExts exts(%v)", mp.config.PartitionId, i.Inode, delExtents)
620
	i.DecSplitExts(mp.config.PartitionId, delExtents)
621
	mp.extDelCh <- delExtents
622
	mp.uidManager.minusUidSpace(i.Uid, i.Inode, delExtents)
623
	return
624
}
625

626
func (mp *metaPartition) fsmEvictInode(ino *Inode) (resp *InodeResponse) {
627
	resp = NewInodeResponse()
628
	log.LogDebugf("action[fsmEvictInode] inode[%v]", ino)
629
	resp.Status = proto.OpOk
630
	item := mp.inodeTree.CopyGet(ino)
631
	if item == nil {
632
		resp.Status = proto.OpNotExistErr
633
		return
634
	}
635
	i := item.(*Inode)
636
	if i.ShouldDelete() {
637
		log.LogDebugf("action[fsmEvictInode] inode[%v] already be mark delete", ino)
638
		return
639
	}
640
	if proto.IsDir(i.Type) {
641
		if i.IsEmptyDirAndNoSnapshot() {
642
			i.SetDeleteMark()
643
		}
644
		return
645
	}
646

647
	if i.IsTempFile() {
648
		log.LogDebugf("action[fsmEvictInode] inode[%v] already linke zero and be set mark delete and be put to freelist", ino)
649
		if i.isEmptyVerList() {
650
			i.SetDeleteMark()
651
			mp.freeList.Push(i.Inode)
652
		}
653
	}
654
	return
655
}
656

657
func (mp *metaPartition) fsmBatchEvictInode(ib InodeBatch) (resp []*InodeResponse) {
658
	for _, ino := range ib {
659
		status := mp.inodeInTx(ino.Inode)
660
		if status != proto.OpOk {
661
			resp = append(resp, &InodeResponse{Status: status})
662
			return
663
		}
664
		resp = append(resp, mp.fsmEvictInode(ino))
665
	}
666
	return
667
}
668

669
func (mp *metaPartition) checkAndInsertFreeList(ino *Inode) {
670
	if proto.IsDir(ino.Type) {
671
		return
672
	}
673
	if ino.ShouldDelete() {
674
		mp.freeList.Push(ino.Inode)
675
	} else if ino.IsTempFile() {
676
		ino.AccessTime = time.Now().Unix()
677
		mp.freeList.Push(ino.Inode)
678
	}
679
}
680

681
func (mp *metaPartition) fsmSetAttr(req *SetattrRequest) (err error) {
682
	log.LogDebugf("action[fsmSetAttr] req %v", req)
683
	ino := NewInode(req.Inode, req.Mode)
684
	item := mp.inodeTree.CopyGet(ino)
685
	if item == nil {
686
		return
687
	}
688
	ino = item.(*Inode)
689
	if ino.ShouldDelete() {
690
		return
691
	}
692
	ino.SetAttr(req)
693
	return
694
}
695

696
// fsmExtentsEmpty only use in datalake situation
697
func (mp *metaPartition) fsmExtentsEmpty(ino *Inode) (status uint8) {
698
	status = proto.OpOk
699
	item := mp.inodeTree.CopyGet(ino)
700
	if item == nil {
701
		status = proto.OpNotExistErr
702
		return
703
	}
704
	i := item.(*Inode)
705
	if i.ShouldDelete() {
706
		status = proto.OpNotExistErr
707
		return
708
	}
709
	if proto.IsDir(i.Type) {
710
		status = proto.OpArgMismatchErr
711
		return
712
	}
713
	log.LogDebugf("action[fsmExtentsEmpty] mp[%v] ino[%v],eks len [%v]", mp.config.PartitionId, ino.Inode, len(i.Extents.eks))
714
	tinyEks := i.CopyTinyExtents()
715
	log.LogDebugf("action[fsmExtentsEmpty] mp[%v] ino[%v],eks tiny len [%v]", mp.config.PartitionId, ino.Inode, len(tinyEks))
716

717
	if len(tinyEks) > 0 {
718
		mp.extDelCh <- tinyEks
719
		mp.uidManager.minusUidSpace(i.Uid, i.Inode, tinyEks)
720
		log.LogDebugf("fsmExtentsEmpty mp[%v] inode[%d] tinyEks(%v)", mp.config.PartitionId, ino.Inode, tinyEks)
721
	}
722

723
	i.EmptyExtents(ino.ModifyTime)
724

725
	return
726
}
727

728
// fsmExtentsEmpty only use in datalake situation
729
func (mp *metaPartition) fsmDelVerExtents(ino *Inode) (status uint8) {
730
	status = proto.OpOk
731
	item := mp.inodeTree.CopyGet(ino)
732
	if item == nil {
733
		status = proto.OpNotExistErr
734
		return
735
	}
736
	i := item.(*Inode)
737
	if i.ShouldDelete() {
738
		status = proto.OpNotExistErr
739
		return
740
	}
741
	if proto.IsDir(i.Type) {
742
		status = proto.OpArgMismatchErr
743
		return
744
	}
745
	log.LogDebugf("action[fsmExtentsEmpty] mp[%v] ino[%v],eks len [%v]", mp.config.PartitionId, ino.Inode, len(i.Extents.eks))
746
	tinyEks := i.CopyTinyExtents()
747
	log.LogDebugf("action[fsmExtentsEmpty] mp[%v] ino[%v],eks tiny len [%v]", mp.config.PartitionId, ino.Inode, len(tinyEks))
748

749
	if len(tinyEks) > 0 {
750
		mp.extDelCh <- tinyEks
751
		log.LogDebugf("fsmExtentsEmpty mp[%v] inode[%d] tinyEks(%v)", mp.config.PartitionId, ino.Inode, tinyEks)
752
	}
753

754
	i.EmptyExtents(ino.ModifyTime)
755

756
	return
757
}
758

759
func (mp *metaPartition) fsmClearInodeCache(ino *Inode) (status uint8) {
760
	status = proto.OpOk
761
	item := mp.inodeTree.Get(ino)
762
	if item == nil {
763
		status = proto.OpNotExistErr
764
		return
765
	}
766
	ino2 := item.(*Inode)
767
	if ino2.ShouldDelete() {
768
		status = proto.OpNotExistErr
769
		return
770
	}
771
	delExtents := ino2.EmptyExtents(ino.ModifyTime)
772
	log.LogInfof("fsmClearInodeCache.mp[%v] inode[%v] DecSplitExts delExtents(%v)", mp.config.PartitionId, ino2.Inode, delExtents)
773
	if len(delExtents) > 0 {
774
		ino2.DecSplitExts(mp.config.PartitionId, delExtents)
775
		mp.extDelCh <- delExtents
776
	}
777
	return
778
}
779

780
// attion: unmarshal error will disard extent
781
func (mp *metaPartition) fsmSendToChan(val []byte, v3 bool) (status uint8) {
782
	sortExtents := NewSortedExtents()
783
	// ek for del don't need version info
784
	err, _ := sortExtents.UnmarshalBinary(val, v3)
785
	if err != nil {
786
		panic(fmt.Errorf("[fsmDelExtents] unmarshal sortExtents error, mp[%v], err(%s)", mp.config.PartitionId, err.Error()))
787
	}
788

789
	log.LogInfof("fsmDelExtents mp[%v] delExtents(%v)", mp.config.PartitionId, len(sortExtents.eks))
790
	mp.extDelCh <- sortExtents.eks
791
	return
792
}
793

794
func (mp *metaPartition) fsmSetInodeQuotaBatch(req *proto.BatchSetMetaserverQuotaReuqest) (resp *proto.BatchSetMetaserverQuotaResponse) {
795
	var files int64
796
	var bytes int64
797
	resp = &proto.BatchSetMetaserverQuotaResponse{}
798
	resp.InodeRes = make(map[uint64]uint8, 0)
799
	for _, ino := range req.Inodes {
800
		var isExist bool
801
		var err error
802

803
		extend := NewExtend(ino)
804
		treeItem := mp.extendTree.Get(extend)
805
		inode := NewInode(ino, 0)
806
		retMsg := mp.getInode(inode, false)
807

808
		if retMsg.Status != proto.OpOk {
809
			log.LogErrorf("fsmSetInodeQuotaBatch get inode[%v] fail.", ino)
810
			resp.InodeRes[ino] = retMsg.Status
811
			continue
812
		}
813
		inode = retMsg.Msg
814
		log.LogDebugf("fsmSetInodeQuotaBatch msg [%v] inode[%v]", retMsg, inode)
815
		quotaInfos := &proto.MetaQuotaInfos{
816
			QuotaInfoMap: make(map[uint32]*proto.MetaQuotaInfo),
817
		}
818
		quotaInfo := &proto.MetaQuotaInfo{
819
			RootInode: req.IsRoot,
820
		}
821

822
		if treeItem == nil {
823
			quotaInfos.QuotaInfoMap[req.QuotaId] = quotaInfo
824
			mp.extendTree.ReplaceOrInsert(extend, true)
825
		} else {
826
			extend = treeItem.(*Extend)
827
			value, exist := extend.Get([]byte(proto.QuotaKey))
828
			if exist {
829
				if err = json.Unmarshal(value, &quotaInfos.QuotaInfoMap); err != nil {
830
					log.LogErrorf("set quota Unmarshal quotaInfos fail [%v]", err)
831
					resp.InodeRes[ino] = proto.OpErr
832
					continue
833
				}
834
				oldQuotaInfo, ok := quotaInfos.QuotaInfoMap[req.QuotaId]
835
				if ok {
836
					isExist = true
837
					quotaInfo = oldQuotaInfo
838
				}
839
			}
840
			quotaInfos.QuotaInfoMap[req.QuotaId] = quotaInfo
841
		}
842
		value, err := json.Marshal(quotaInfos.QuotaInfoMap)
843
		if err != nil {
844
			log.LogErrorf("set quota marsha1 quotaInfos [%v] fail [%v]", quotaInfos, err)
845
			resp.InodeRes[ino] = proto.OpErr
846
			continue
847
		}
848

849
		extend.Put([]byte(proto.QuotaKey), value, mp.verSeq)
850
		resp.InodeRes[ino] = proto.OpOk
851
		if !isExist {
852
			files += 1
853
			bytes += int64(inode.Size)
854
		}
855
	}
856
	mp.mqMgr.updateUsedInfo(bytes, files, req.QuotaId)
857
	log.LogInfof("fsmSetInodeQuotaBatch quotaId [%v] resp [%v] success.", req.QuotaId, resp)
858
	return
859
}
860

861
func (mp *metaPartition) fsmDeleteInodeQuotaBatch(req *proto.BatchDeleteMetaserverQuotaReuqest) (resp *proto.BatchDeleteMetaserverQuotaResponse) {
862
	var files int64
863
	var bytes int64
864
	resp = &proto.BatchDeleteMetaserverQuotaResponse{}
865
	resp.InodeRes = make(map[uint64]uint8, 0)
866

867
	for _, ino := range req.Inodes {
868
		var err error
869
		extend := NewExtend(ino)
870
		treeItem := mp.extendTree.Get(extend)
871
		inode := NewInode(ino, 0)
872
		retMsg := mp.getInode(inode, false)
873
		if retMsg.Status != proto.OpOk {
874
			log.LogErrorf("fsmDeleteInodeQuotaBatch get inode[%v] fail.", ino)
875
			resp.InodeRes[ino] = retMsg.Status
876
			continue
877
		}
878
		inode = retMsg.Msg
879
		log.LogDebugf("fsmDeleteInodeQuotaBatch msg [%v] inode[%v]", retMsg, inode)
880
		quotaInfos := &proto.MetaQuotaInfos{
881
			QuotaInfoMap: make(map[uint32]*proto.MetaQuotaInfo),
882
		}
883

884
		if treeItem == nil {
885
			log.LogDebugf("fsmDeleteInodeQuotaBatch inode[%v] not has extend ", ino)
886
			resp.InodeRes[ino] = proto.OpOk
887
			continue
888
		} else {
889
			extend = treeItem.(*Extend)
890
			value, exist := extend.Get([]byte(proto.QuotaKey))
891
			if exist {
892
				if err = json.Unmarshal(value, &quotaInfos.QuotaInfoMap); err != nil {
893
					log.LogErrorf("fsmDeleteInodeQuotaBatch ino[%v] Unmarshal quotaInfos fail [%v]", ino, err)
894
					resp.InodeRes[ino] = proto.OpErr
895
					continue
896
				}
897

898
				_, ok := quotaInfos.QuotaInfoMap[req.QuotaId]
899
				if ok {
900
					delete(quotaInfos.QuotaInfoMap, req.QuotaId)
901
					if len(quotaInfos.QuotaInfoMap) == 0 {
902
						extend.Remove([]byte(proto.QuotaKey))
903
					} else {
904
						value, err = json.Marshal(quotaInfos.QuotaInfoMap)
905
						if err != nil {
906
							log.LogErrorf("fsmDeleteInodeQuotaBatch marsha1 quotaInfos [%v] fail [%v]", quotaInfos, err)
907
							resp.InodeRes[ino] = proto.OpErr
908
							continue
909
						}
910
						extend.Put([]byte(proto.QuotaKey), value, mp.verSeq)
911
					}
912
				} else {
913
					log.LogDebugf("fsmDeleteInodeQuotaBatch QuotaInfoMap can not find inode[%v] quota [%v]", ino, req.QuotaId)
914
					resp.InodeRes[ino] = proto.OpOk
915
					continue
916
				}
917
			} else {
918
				resp.InodeRes[ino] = proto.OpOk
919
				continue
920
			}
921
		}
922
		files -= 1
923
		bytes -= int64(inode.Size)
924
	}
925
	mp.mqMgr.updateUsedInfo(bytes, files, req.QuotaId)
926
	log.LogInfof("fsmDeleteInodeQuotaBatch quotaId [%v] resp [%v] success.", req.QuotaId, resp)
927
	return
928
}
929

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.