1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
23
"github.com/cubefs/cubefs/proto"
24
"github.com/cubefs/cubefs/util/auditlog"
25
"github.com/cubefs/cubefs/util/errors"
26
"github.com/cubefs/cubefs/util/log"
29
func (mp *metaPartition) TxCreateDentry(req *proto.TxCreateDentryRequest, p *Packet, remoteAddr string) (err error) {
31
if mp.IsEnableAuditLog() {
33
auditlog.LogDentryOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), req.Name, req.GetFullPath(), err, time.Since(start).Milliseconds(), req.Inode, 0)
36
if req.ParentID == req.Inode {
37
err = fmt.Errorf("parentId is equal inodeId")
38
p.PacketErrorWithBody(proto.OpExistErr, []byte(err.Error()))
42
for _, quotaId := range req.QuotaIds {
43
status := mp.mqMgr.IsOverQuota(false, true, quotaId)
45
err = errors.New("create dentry is over quota")
46
reply := []byte(err.Error())
47
p.PacketErrorWithBody(status, reply)
53
item := mp.inodeTree.Get(NewInode(req.ParentID, 0))
55
err = fmt.Errorf("parent inode not exists")
56
p.PacketErrorWithBody(proto.OpNotExistErr, []byte(err.Error()))
60
parIno = item.(*Inode)
61
quota := atomic.LoadUint32(&dirChildrenNumLimit)
62
if parIno.NLink >= quota {
63
err = fmt.Errorf("parent dir quota limitation reached")
64
p.PacketErrorWithBody(proto.OpDirQuota, []byte(err.Error()))
68
txInfo := req.TxInfo.GetCopy()
69
txDentry := NewTxDentry(req.ParentID, req.Name, req.Inode, req.Mode, parIno, txInfo)
70
val, err := txDentry.Marshal()
75
status, err := mp.submit(opFSMTxCreateDentry, val)
77
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
81
p.ResultCode = status.(uint8)
85
// CreateDentry returns a new dentry.
86
func (mp *metaPartition) CreateDentry(req *CreateDentryReq, p *Packet, remoteAddr string) (err error) {
88
if mp.IsEnableAuditLog() {
90
auditlog.LogDentryOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), req.Name, req.GetFullPath(), err, time.Since(start).Milliseconds(), req.Inode, req.ParentID)
93
if req.ParentID == req.Inode {
94
err = fmt.Errorf("parentId is equal inodeId")
95
p.PacketErrorWithBody(proto.OpExistErr, []byte(err.Error()))
99
item := mp.inodeTree.CopyGet(NewInode(req.ParentID, 0))
101
err = fmt.Errorf("parent inode not exists")
102
p.PacketErrorWithBody(proto.OpNotExistErr, []byte(err.Error()))
105
parIno := item.(*Inode)
106
quota := atomic.LoadUint32(&dirChildrenNumLimit)
107
if parIno.NLink >= quota {
108
err = fmt.Errorf("parent dir quota limitation reached")
109
p.PacketErrorWithBody(proto.OpDirQuota, []byte(err.Error()))
115
ParentId: req.ParentID,
119
multiSnap: NewDentrySnap(mp.GetVerSeq()),
121
val, err := dentry.Marshal()
125
resp, err := mp.submit(opFSMCreateDentry, val)
127
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
130
p.ResultCode = resp.(uint8)
134
func (mp *metaPartition) QuotaCreateDentry(req *proto.QuotaCreateDentryRequest, p *Packet, remoteAddr string) (err error) {
136
if mp.IsEnableAuditLog() {
138
auditlog.LogDentryOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), req.Name, req.GetFullPath(), err, time.Since(start).Milliseconds(), req.Inode, req.ParentID)
141
if req.ParentID == req.Inode {
142
err = fmt.Errorf("parentId is equal inodeId")
143
p.PacketErrorWithBody(proto.OpExistErr, []byte(err.Error()))
146
for _, quotaId := range req.QuotaIds {
147
status := mp.mqMgr.IsOverQuota(false, true, quotaId)
149
err = errors.New("create dentry is over quota")
150
reply := []byte(err.Error())
151
p.PacketErrorWithBody(status, reply)
155
item := mp.inodeTree.CopyGet(NewInode(req.ParentID, 0))
157
err = fmt.Errorf("parent inode not exists")
158
p.PacketErrorWithBody(proto.OpNotExistErr, []byte(err.Error()))
161
parIno := item.(*Inode)
162
quota := atomic.LoadUint32(&dirChildrenNumLimit)
163
if parIno.NLink >= quota {
164
err = fmt.Errorf("parent dir quota limitation reached")
165
p.PacketErrorWithBody(proto.OpDirQuota, []byte(err.Error()))
171
ParentId: req.ParentID,
176
dentry.setVerSeq(mp.verSeq)
177
log.LogDebugf("action[CreateDentry] mp[%v] with seq [%v],dentry [%v]", mp.config.PartitionId, mp.verSeq, dentry)
178
val, err := dentry.Marshal()
182
resp, err := mp.submit(opFSMCreateDentry, val)
184
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
187
p.ResultCode = resp.(uint8)
191
func (mp *metaPartition) TxDeleteDentry(req *proto.TxDeleteDentryRequest, p *Packet, remoteAddr string) (err error) {
193
if mp.IsEnableAuditLog() {
195
auditlog.LogDentryOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), req.Name, req.GetFullPath(), err, time.Since(start).Milliseconds(), req.Ino, req.ParentID)
198
txInfo := req.TxInfo.GetCopy()
200
ParentId: req.ParentID,
205
if p.ResultCode == proto.OpOk {
207
resp := &proto.TxDeleteDentryResponse{
210
reply, err = json.Marshal(resp)
211
p.PacketOkWithBody(reply)
215
dentry, status := mp.getDentry(den)
216
if status != proto.OpOk {
217
if mp.txDentryInRb(req.ParentID, req.Name, req.TxInfo.TxID) {
218
p.ResultCode = proto.OpOk
219
log.LogWarnf("TxDeleteDentry: dentry is already been deleted before, req %v", req)
223
err = fmt.Errorf("dentry[%v] not exists", den)
225
p.PacketErrorWithBody(status, []byte(err.Error()))
229
if dentry.Inode != req.Ino {
230
err = fmt.Errorf("target name ino is not right, par %d, name %s, want %d, got %d",
231
req.PartitionID, req.Name, req.Ino, dentry.Inode)
233
p.PacketErrorWithBody(proto.OpExistErr, []byte(err.Error()))
236
parIno := NewInode(req.ParentID, 0)
237
inoResp := mp.getInode(parIno, false)
238
if inoResp.Status != proto.OpOk {
239
err = fmt.Errorf("parIno[%v] not exists", parIno.Inode)
240
p.PacketErrorWithBody(inoResp.Status, []byte(err.Error()))
244
txDentry := &TxDentry{
245
// ParInode: inoResp.Msg,
250
val, err := txDentry.Marshal()
252
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
256
r, err := mp.submit(opFSMTxDeleteDentry, val)
258
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
262
retMsg := r.(*DentryResponse)
263
p.ResultCode = retMsg.Status
267
// DeleteDentry deletes a dentry.
268
func (mp *metaPartition) DeleteDentry(req *DeleteDentryReq, p *Packet, remoteAddr string) (err error) {
270
if mp.IsEnableAuditLog() {
272
auditlog.LogDentryOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), req.Name, req.GetFullPath(), err, time.Since(start).Milliseconds(), 0, req.ParentID)
275
if req.InodeCreateTime > 0 {
276
if mp.vol.volDeleteLockTime > 0 && req.InodeCreateTime+mp.vol.volDeleteLockTime*60*60 > time.Now().Unix() {
277
err = errors.NewErrorf("the current Inode[%v] is still locked for deletion", req.Name)
278
log.LogDebugf("DeleteDentry: the current Inode is still locked for deletion, inode[%v] createTime(%v) mw.volDeleteLockTime(%v) now(%v)", req.Name, req.InodeCreateTime, mp.vol.volDeleteLockTime, time.Now().Unix())
279
p.PacketErrorWithBody(proto.OpNotPerm, []byte(err.Error()))
284
ParentId: req.ParentID,
287
dentry.setVerSeq(req.Verseq)
288
log.LogDebugf("action[DeleteDentry] den param(%v)", dentry)
290
val, err := dentry.Marshal()
292
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
295
if mp.verSeq == 0 && dentry.getSeqFiled() > 0 {
296
err = fmt.Errorf("snapshot not enabled")
297
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
300
log.LogDebugf("action[DeleteDentry] submit!")
301
r, err := mp.submit(opFSMDeleteDentry, val)
303
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
306
retMsg := r.(*DentryResponse)
307
p.ResultCode = retMsg.Status
309
if p.ResultCode == proto.OpOk {
311
resp := &DeleteDentryResp{
314
reply, err = json.Marshal(resp)
315
p.PacketOkWithBody(reply)
320
// DeleteDentry deletes a dentry.
321
func (mp *metaPartition) DeleteDentryBatch(req *BatchDeleteDentryReq, p *Packet, remoteAddr string) (err error) {
322
db := make(DentryBatch, 0, len(req.Dens))
324
for i, d := range req.Dens {
325
db = append(db, &Dentry{
326
ParentId: req.ParentID,
333
if len(req.FullPaths) > i {
334
fullPath = req.FullPaths[i]
336
if mp.IsEnableAuditLog() {
338
auditlog.LogDentryOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), den.Name, fullPath, err, time.Since(start).Milliseconds(), den.Inode, req.ParentID)
343
val, err := db.Marshal()
345
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
348
r, err := mp.submit(opFSMDeleteDentryBatch, val)
350
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
354
retMsg := r.([]*DentryResponse)
355
p.ResultCode = proto.OpOk
357
bddr := &BatchDeleteDentryResp{}
359
for _, m := range retMsg {
360
if m.Status != proto.OpOk {
361
p.ResultCode = proto.OpErr
364
if dentry := m.Msg; dentry != nil {
365
bddr.Items = append(bddr.Items, &struct {
366
Inode uint64 `json:"ino"`
367
Status uint8 `json:"status"`
373
bddr.Items = append(bddr.Items, &struct {
374
Inode uint64 `json:"ino"`
375
Status uint8 `json:"status"`
383
reply, err := json.Marshal(bddr)
385
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
388
p.PacketOkWithBody(reply)
393
func (mp *metaPartition) TxUpdateDentry(req *proto.TxUpdateDentryRequest, p *Packet, remoteAddr string) (err error) {
395
if mp.IsEnableAuditLog() {
397
auditlog.LogDentryOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), req.Name, req.GetFullPath(), err, time.Since(start).Milliseconds(), req.Inode, req.ParentID)
400
if req.ParentID == req.Inode {
401
err = fmt.Errorf("parentId is equal inodeId")
402
p.PacketErrorWithBody(proto.OpExistErr, []byte(err.Error()))
406
txInfo := req.TxInfo.GetCopy()
409
if p.ResultCode == proto.OpOk {
411
m := &proto.TxUpdateDentryResponse{
414
reply, _ = json.Marshal(m)
415
p.PacketOkWithBody(reply)
419
newDentry := &Dentry{
420
ParentId: req.ParentID,
424
oldDentry, status := mp.getDentry(newDentry)
425
if status != proto.OpOk {
426
if mp.txDentryInRb(req.ParentID, req.Name, req.TxInfo.TxID) {
427
p.ResultCode = proto.OpOk
428
log.LogWarnf("TxDeleteDentry: dentry is already been deleted before, req %v", req)
431
err = fmt.Errorf("oldDentry[%v] not exists", oldDentry)
432
p.PacketErrorWithBody(status, []byte(err.Error()))
436
if oldDentry.Inode != req.OldIno {
437
err = fmt.Errorf("oldDentry is alredy updated, req %v, old [%v]", req, oldDentry)
438
p.PacketErrorWithBody(proto.OpNotExistErr, []byte(err.Error()))
442
txDentry := &TxUpdateDentry{
443
OldDentry: oldDentry,
444
NewDentry: newDentry,
447
val, err := txDentry.Marshal()
449
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
452
resp, err := mp.submit(opFSMTxUpdateDentry, val)
454
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
458
msg := resp.(*DentryResponse)
459
p.ResultCode = msg.Status
463
// UpdateDentry updates a dentry.
464
func (mp *metaPartition) UpdateDentry(req *UpdateDentryReq, p *Packet, remoteAddr string) (err error) {
466
if mp.IsEnableAuditLog() {
468
auditlog.LogDentryOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), req.Name, req.GetFullPath(), err, time.Since(start).Milliseconds(), req.Inode, req.ParentID)
471
if req.ParentID == req.Inode {
472
err = fmt.Errorf("parentId is equal inodeId")
473
p.PacketErrorWithBody(proto.OpExistErr, []byte(err.Error()))
478
ParentId: req.ParentID,
482
dentry.setVerSeq(mp.verSeq)
483
val, err := dentry.Marshal()
485
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
488
resp, err := mp.submit(opFSMUpdateDentry, val)
490
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
493
msg := resp.(*DentryResponse)
494
p.ResultCode = msg.Status
495
if msg.Status == proto.OpOk {
497
m := &UpdateDentryResp{
498
Inode: msg.Msg.Inode,
500
reply, err = json.Marshal(m)
501
p.PacketOkWithBody(reply)
506
func (mp *metaPartition) ReadDirOnly(req *ReadDirOnlyReq, p *Packet) (err error) {
507
resp := mp.readDirOnly(req)
508
reply, err := json.Marshal(resp)
510
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
513
p.PacketOkWithBody(reply)
517
// ReadDir reads the directory based on the given request.
518
func (mp *metaPartition) ReadDir(req *ReadDirReq, p *Packet) (err error) {
519
resp := mp.readDir(req)
520
reply, err := json.Marshal(resp)
522
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
525
p.PacketOkWithBody(reply)
529
func (mp *metaPartition) ReadDirLimit(req *ReadDirLimitReq, p *Packet) (err error) {
530
log.LogInfof("action[ReadDirLimit] read seq [%v], request[%v]", req.VerSeq, req)
531
resp := mp.readDirLimit(req)
532
reply, err := json.Marshal(resp)
534
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
537
p.PacketOkWithBody(reply)
541
// Lookup looks up the given dentry from the request.
542
func (mp *metaPartition) Lookup(req *LookupReq, p *Packet) (err error) {
544
ParentId: req.ParentID,
547
dentry.setVerSeq(req.VerSeq)
548
var denList []proto.DetryInfo
550
denList = mp.getDentryList(dentry)
552
dentry, status := mp.getDentry(dentry)
555
if status == proto.OpOk || req.VerAll {
557
if status == proto.OpOk {
561
VerSeq: dentry.getSeqFiled(),
572
reply, err = json.Marshal(resp)
575
reply = []byte(err.Error())
579
p.PacketErrorWithBody(status, reply)
583
// GetDentryTree returns the dentry tree stored in the meta partition.
584
func (mp *metaPartition) GetDentryTree() *BTree {
585
return mp.dentryTree.GetTree()
588
// GetDentryTreeLen returns the dentry tree length.
589
func (mp *metaPartition) GetDentryTreeLen() int {
590
if mp.dentryTree == nil {
593
return mp.dentryTree.Len()