cubefs

Форк
0
/
partition_op_dentry.go 
594 строки · 15.7 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package metanode
16

17
import (
18
	"encoding/json"
19
	"fmt"
20
	"sync/atomic"
21
	"time"
22

23
	"github.com/cubefs/cubefs/proto"
24
	"github.com/cubefs/cubefs/util/auditlog"
25
	"github.com/cubefs/cubefs/util/errors"
26
	"github.com/cubefs/cubefs/util/log"
27
)
28

29
func (mp *metaPartition) TxCreateDentry(req *proto.TxCreateDentryRequest, p *Packet, remoteAddr string) (err error) {
30
	start := time.Now()
31
	if mp.IsEnableAuditLog() {
32
		defer func() {
33
			auditlog.LogDentryOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), req.Name, req.GetFullPath(), err, time.Since(start).Milliseconds(), req.Inode, 0)
34
		}()
35
	}
36
	if req.ParentID == req.Inode {
37
		err = fmt.Errorf("parentId is equal inodeId")
38
		p.PacketErrorWithBody(proto.OpExistErr, []byte(err.Error()))
39
		return
40
	}
41

42
	for _, quotaId := range req.QuotaIds {
43
		status := mp.mqMgr.IsOverQuota(false, true, quotaId)
44
		if status != 0 {
45
			err = errors.New("create dentry is over quota")
46
			reply := []byte(err.Error())
47
			p.PacketErrorWithBody(status, reply)
48
			return
49
		}
50
	}
51

52
	var parIno *Inode
53
	item := mp.inodeTree.Get(NewInode(req.ParentID, 0))
54
	if item == nil {
55
		err = fmt.Errorf("parent inode not exists")
56
		p.PacketErrorWithBody(proto.OpNotExistErr, []byte(err.Error()))
57
		return
58
	}
59

60
	parIno = item.(*Inode)
61
	quota := atomic.LoadUint32(&dirChildrenNumLimit)
62
	if parIno.NLink >= quota {
63
		err = fmt.Errorf("parent dir quota limitation reached")
64
		p.PacketErrorWithBody(proto.OpDirQuota, []byte(err.Error()))
65
		return
66
	}
67

68
	txInfo := req.TxInfo.GetCopy()
69
	txDentry := NewTxDentry(req.ParentID, req.Name, req.Inode, req.Mode, parIno, txInfo)
70
	val, err := txDentry.Marshal()
71
	if err != nil {
72
		return
73
	}
74

75
	status, err := mp.submit(opFSMTxCreateDentry, val)
76
	if err != nil {
77
		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
78
		return
79
	}
80

81
	p.ResultCode = status.(uint8)
82
	return
83
}
84

85
// CreateDentry returns a new dentry.
86
func (mp *metaPartition) CreateDentry(req *CreateDentryReq, p *Packet, remoteAddr string) (err error) {
87
	start := time.Now()
88
	if mp.IsEnableAuditLog() {
89
		defer func() {
90
			auditlog.LogDentryOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), req.Name, req.GetFullPath(), err, time.Since(start).Milliseconds(), req.Inode, req.ParentID)
91
		}()
92
	}
93
	if req.ParentID == req.Inode {
94
		err = fmt.Errorf("parentId is equal inodeId")
95
		p.PacketErrorWithBody(proto.OpExistErr, []byte(err.Error()))
96
		return
97
	}
98

99
	item := mp.inodeTree.CopyGet(NewInode(req.ParentID, 0))
100
	if item == nil {
101
		err = fmt.Errorf("parent inode not exists")
102
		p.PacketErrorWithBody(proto.OpNotExistErr, []byte(err.Error()))
103
		return
104
	} else {
105
		parIno := item.(*Inode)
106
		quota := atomic.LoadUint32(&dirChildrenNumLimit)
107
		if parIno.NLink >= quota {
108
			err = fmt.Errorf("parent dir quota limitation reached")
109
			p.PacketErrorWithBody(proto.OpDirQuota, []byte(err.Error()))
110
			return
111
		}
112
	}
113

114
	dentry := &Dentry{
115
		ParentId:  req.ParentID,
116
		Name:      req.Name,
117
		Inode:     req.Inode,
118
		Type:      req.Mode,
119
		multiSnap: NewDentrySnap(mp.GetVerSeq()),
120
	}
121
	val, err := dentry.Marshal()
122
	if err != nil {
123
		return
124
	}
125
	resp, err := mp.submit(opFSMCreateDentry, val)
126
	if err != nil {
127
		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
128
		return
129
	}
130
	p.ResultCode = resp.(uint8)
131
	return
132
}
133

134
func (mp *metaPartition) QuotaCreateDentry(req *proto.QuotaCreateDentryRequest, p *Packet, remoteAddr string) (err error) {
135
	start := time.Now()
136
	if mp.IsEnableAuditLog() {
137
		defer func() {
138
			auditlog.LogDentryOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), req.Name, req.GetFullPath(), err, time.Since(start).Milliseconds(), req.Inode, req.ParentID)
139
		}()
140
	}
141
	if req.ParentID == req.Inode {
142
		err = fmt.Errorf("parentId is equal inodeId")
143
		p.PacketErrorWithBody(proto.OpExistErr, []byte(err.Error()))
144
		return
145
	}
146
	for _, quotaId := range req.QuotaIds {
147
		status := mp.mqMgr.IsOverQuota(false, true, quotaId)
148
		if status != 0 {
149
			err = errors.New("create dentry is over quota")
150
			reply := []byte(err.Error())
151
			p.PacketErrorWithBody(status, reply)
152
			return
153
		}
154
	}
155
	item := mp.inodeTree.CopyGet(NewInode(req.ParentID, 0))
156
	if item == nil {
157
		err = fmt.Errorf("parent inode not exists")
158
		p.PacketErrorWithBody(proto.OpNotExistErr, []byte(err.Error()))
159
		return
160
	} else {
161
		parIno := item.(*Inode)
162
		quota := atomic.LoadUint32(&dirChildrenNumLimit)
163
		if parIno.NLink >= quota {
164
			err = fmt.Errorf("parent dir quota limitation reached")
165
			p.PacketErrorWithBody(proto.OpDirQuota, []byte(err.Error()))
166
			return
167
		}
168
	}
169

170
	dentry := &Dentry{
171
		ParentId: req.ParentID,
172
		Name:     req.Name,
173
		Inode:    req.Inode,
174
		Type:     req.Mode,
175
	}
176
	dentry.setVerSeq(mp.verSeq)
177
	log.LogDebugf("action[CreateDentry] mp[%v] with seq [%v],dentry [%v]", mp.config.PartitionId, mp.verSeq, dentry)
178
	val, err := dentry.Marshal()
179
	if err != nil {
180
		return
181
	}
182
	resp, err := mp.submit(opFSMCreateDentry, val)
183
	if err != nil {
184
		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
185
		return
186
	}
187
	p.ResultCode = resp.(uint8)
188
	return
189
}
190

191
func (mp *metaPartition) TxDeleteDentry(req *proto.TxDeleteDentryRequest, p *Packet, remoteAddr string) (err error) {
192
	start := time.Now()
193
	if mp.IsEnableAuditLog() {
194
		defer func() {
195
			auditlog.LogDentryOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), req.Name, req.GetFullPath(), err, time.Since(start).Milliseconds(), req.Ino, req.ParentID)
196
		}()
197
	}
198
	txInfo := req.TxInfo.GetCopy()
199
	den := &Dentry{
200
		ParentId: req.ParentID,
201
		Name:     req.Name,
202
	}
203

204
	defer func() {
205
		if p.ResultCode == proto.OpOk {
206
			var reply []byte
207
			resp := &proto.TxDeleteDentryResponse{
208
				Inode: req.Ino,
209
			}
210
			reply, err = json.Marshal(resp)
211
			p.PacketOkWithBody(reply)
212
		}
213
	}()
214

215
	dentry, status := mp.getDentry(den)
216
	if status != proto.OpOk {
217
		if mp.txDentryInRb(req.ParentID, req.Name, req.TxInfo.TxID) {
218
			p.ResultCode = proto.OpOk
219
			log.LogWarnf("TxDeleteDentry: dentry is already been deleted before, req %v", req)
220
			return
221
		}
222

223
		err = fmt.Errorf("dentry[%v] not exists", den)
224
		log.LogWarn(err)
225
		p.PacketErrorWithBody(status, []byte(err.Error()))
226
		return
227
	}
228

229
	if dentry.Inode != req.Ino {
230
		err = fmt.Errorf("target name ino is not right, par %d, name %s, want %d, got %d",
231
			req.PartitionID, req.Name, req.Ino, dentry.Inode)
232
		log.LogWarn(err)
233
		p.PacketErrorWithBody(proto.OpExistErr, []byte(err.Error()))
234
		return
235
	}
236
	parIno := NewInode(req.ParentID, 0)
237
	inoResp := mp.getInode(parIno, false)
238
	if inoResp.Status != proto.OpOk {
239
		err = fmt.Errorf("parIno[%v] not exists", parIno.Inode)
240
		p.PacketErrorWithBody(inoResp.Status, []byte(err.Error()))
241
		return
242
	}
243

244
	txDentry := &TxDentry{
245
		// ParInode: inoResp.Msg,
246
		Dentry: dentry,
247
		TxInfo: txInfo,
248
	}
249

250
	val, err := txDentry.Marshal()
251
	if err != nil {
252
		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
253
		return
254
	}
255

256
	r, err := mp.submit(opFSMTxDeleteDentry, val)
257
	if err != nil {
258
		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
259
		return
260
	}
261

262
	retMsg := r.(*DentryResponse)
263
	p.ResultCode = retMsg.Status
264
	return
265
}
266

267
// DeleteDentry deletes a dentry.
268
func (mp *metaPartition) DeleteDentry(req *DeleteDentryReq, p *Packet, remoteAddr string) (err error) {
269
	start := time.Now()
270
	if mp.IsEnableAuditLog() {
271
		defer func() {
272
			auditlog.LogDentryOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), req.Name, req.GetFullPath(), err, time.Since(start).Milliseconds(), 0, req.ParentID)
273
		}()
274
	}
275
	if req.InodeCreateTime > 0 {
276
		if mp.vol.volDeleteLockTime > 0 && req.InodeCreateTime+mp.vol.volDeleteLockTime*60*60 > time.Now().Unix() {
277
			err = errors.NewErrorf("the current Inode[%v] is still locked for deletion", req.Name)
278
			log.LogDebugf("DeleteDentry: the current Inode is still locked for deletion, inode[%v] createTime(%v) mw.volDeleteLockTime(%v) now(%v)", req.Name, req.InodeCreateTime, mp.vol.volDeleteLockTime, time.Now().Unix())
279
			p.PacketErrorWithBody(proto.OpNotPerm, []byte(err.Error()))
280
			return
281
		}
282
	}
283
	dentry := &Dentry{
284
		ParentId: req.ParentID,
285
		Name:     req.Name,
286
	}
287
	dentry.setVerSeq(req.Verseq)
288
	log.LogDebugf("action[DeleteDentry] den param(%v)", dentry)
289

290
	val, err := dentry.Marshal()
291
	if err != nil {
292
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
293
		return
294
	}
295
	if mp.verSeq == 0 && dentry.getSeqFiled() > 0 {
296
		err = fmt.Errorf("snapshot not enabled")
297
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
298
		return
299
	}
300
	log.LogDebugf("action[DeleteDentry] submit!")
301
	r, err := mp.submit(opFSMDeleteDentry, val)
302
	if err != nil {
303
		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
304
		return
305
	}
306
	retMsg := r.(*DentryResponse)
307
	p.ResultCode = retMsg.Status
308
	dentry = retMsg.Msg
309
	if p.ResultCode == proto.OpOk {
310
		var reply []byte
311
		resp := &DeleteDentryResp{
312
			Inode: dentry.Inode,
313
		}
314
		reply, err = json.Marshal(resp)
315
		p.PacketOkWithBody(reply)
316
	}
317
	return
318
}
319

320
// DeleteDentry deletes a dentry.
321
func (mp *metaPartition) DeleteDentryBatch(req *BatchDeleteDentryReq, p *Packet, remoteAddr string) (err error) {
322
	db := make(DentryBatch, 0, len(req.Dens))
323
	start := time.Now()
324
	for i, d := range req.Dens {
325
		db = append(db, &Dentry{
326
			ParentId: req.ParentID,
327
			Name:     d.Name,
328
			Inode:    d.Inode,
329
			Type:     d.Type,
330
		})
331
		den := &d
332
		fullPath := ""
333
		if len(req.FullPaths) > i {
334
			fullPath = req.FullPaths[i]
335
		}
336
		if mp.IsEnableAuditLog() {
337
			defer func() {
338
				auditlog.LogDentryOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), den.Name, fullPath, err, time.Since(start).Milliseconds(), den.Inode, req.ParentID)
339
			}()
340
		}
341
	}
342

343
	val, err := db.Marshal()
344
	if err != nil {
345
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
346
		return
347
	}
348
	r, err := mp.submit(opFSMDeleteDentryBatch, val)
349
	if err != nil {
350
		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
351
		return err
352
	}
353

354
	retMsg := r.([]*DentryResponse)
355
	p.ResultCode = proto.OpOk
356

357
	bddr := &BatchDeleteDentryResp{}
358

359
	for _, m := range retMsg {
360
		if m.Status != proto.OpOk {
361
			p.ResultCode = proto.OpErr
362
		}
363

364
		if dentry := m.Msg; dentry != nil {
365
			bddr.Items = append(bddr.Items, &struct {
366
				Inode  uint64 `json:"ino"`
367
				Status uint8  `json:"status"`
368
			}{
369
				Inode:  dentry.Inode,
370
				Status: m.Status,
371
			})
372
		} else {
373
			bddr.Items = append(bddr.Items, &struct {
374
				Inode  uint64 `json:"ino"`
375
				Status uint8  `json:"status"`
376
			}{
377
				Status: m.Status,
378
			})
379
		}
380

381
	}
382

383
	reply, err := json.Marshal(bddr)
384
	if err != nil {
385
		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
386
		return err
387
	}
388
	p.PacketOkWithBody(reply)
389

390
	return
391
}
392

393
func (mp *metaPartition) TxUpdateDentry(req *proto.TxUpdateDentryRequest, p *Packet, remoteAddr string) (err error) {
394
	start := time.Now()
395
	if mp.IsEnableAuditLog() {
396
		defer func() {
397
			auditlog.LogDentryOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), req.Name, req.GetFullPath(), err, time.Since(start).Milliseconds(), req.Inode, req.ParentID)
398
		}()
399
	}
400
	if req.ParentID == req.Inode {
401
		err = fmt.Errorf("parentId is equal inodeId")
402
		p.PacketErrorWithBody(proto.OpExistErr, []byte(err.Error()))
403
		return
404
	}
405

406
	txInfo := req.TxInfo.GetCopy()
407

408
	defer func() {
409
		if p.ResultCode == proto.OpOk {
410
			var reply []byte
411
			m := &proto.TxUpdateDentryResponse{
412
				Inode: req.OldIno,
413
			}
414
			reply, _ = json.Marshal(m)
415
			p.PacketOkWithBody(reply)
416
		}
417
	}()
418

419
	newDentry := &Dentry{
420
		ParentId: req.ParentID,
421
		Name:     req.Name,
422
		Inode:    req.Inode,
423
	}
424
	oldDentry, status := mp.getDentry(newDentry)
425
	if status != proto.OpOk {
426
		if mp.txDentryInRb(req.ParentID, req.Name, req.TxInfo.TxID) {
427
			p.ResultCode = proto.OpOk
428
			log.LogWarnf("TxDeleteDentry: dentry is already been deleted before, req %v", req)
429
			return
430
		}
431
		err = fmt.Errorf("oldDentry[%v] not exists", oldDentry)
432
		p.PacketErrorWithBody(status, []byte(err.Error()))
433
		return
434
	}
435

436
	if oldDentry.Inode != req.OldIno {
437
		err = fmt.Errorf("oldDentry is alredy updated, req %v, old [%v]", req, oldDentry)
438
		p.PacketErrorWithBody(proto.OpNotExistErr, []byte(err.Error()))
439
		return
440
	}
441

442
	txDentry := &TxUpdateDentry{
443
		OldDentry: oldDentry,
444
		NewDentry: newDentry,
445
		TxInfo:    txInfo,
446
	}
447
	val, err := txDentry.Marshal()
448
	if err != nil {
449
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
450
		return
451
	}
452
	resp, err := mp.submit(opFSMTxUpdateDentry, val)
453
	if err != nil {
454
		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
455
		return
456
	}
457

458
	msg := resp.(*DentryResponse)
459
	p.ResultCode = msg.Status
460
	return
461
}
462

463
// UpdateDentry updates a dentry.
464
func (mp *metaPartition) UpdateDentry(req *UpdateDentryReq, p *Packet, remoteAddr string) (err error) {
465
	start := time.Now()
466
	if mp.IsEnableAuditLog() {
467
		defer func() {
468
			auditlog.LogDentryOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), req.Name, req.GetFullPath(), err, time.Since(start).Milliseconds(), req.Inode, req.ParentID)
469
		}()
470
	}
471
	if req.ParentID == req.Inode {
472
		err = fmt.Errorf("parentId is equal inodeId")
473
		p.PacketErrorWithBody(proto.OpExistErr, []byte(err.Error()))
474
		return
475
	}
476

477
	dentry := &Dentry{
478
		ParentId: req.ParentID,
479
		Name:     req.Name,
480
		Inode:    req.Inode,
481
	}
482
	dentry.setVerSeq(mp.verSeq)
483
	val, err := dentry.Marshal()
484
	if err != nil {
485
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
486
		return
487
	}
488
	resp, err := mp.submit(opFSMUpdateDentry, val)
489
	if err != nil {
490
		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
491
		return
492
	}
493
	msg := resp.(*DentryResponse)
494
	p.ResultCode = msg.Status
495
	if msg.Status == proto.OpOk {
496
		var reply []byte
497
		m := &UpdateDentryResp{
498
			Inode: msg.Msg.Inode,
499
		}
500
		reply, err = json.Marshal(m)
501
		p.PacketOkWithBody(reply)
502
	}
503
	return
504
}
505

506
func (mp *metaPartition) ReadDirOnly(req *ReadDirOnlyReq, p *Packet) (err error) {
507
	resp := mp.readDirOnly(req)
508
	reply, err := json.Marshal(resp)
509
	if err != nil {
510
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
511
		return
512
	}
513
	p.PacketOkWithBody(reply)
514
	return
515
}
516

517
// ReadDir reads the directory based on the given request.
518
func (mp *metaPartition) ReadDir(req *ReadDirReq, p *Packet) (err error) {
519
	resp := mp.readDir(req)
520
	reply, err := json.Marshal(resp)
521
	if err != nil {
522
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
523
		return
524
	}
525
	p.PacketOkWithBody(reply)
526
	return
527
}
528

529
func (mp *metaPartition) ReadDirLimit(req *ReadDirLimitReq, p *Packet) (err error) {
530
	log.LogInfof("action[ReadDirLimit] read seq [%v], request[%v]", req.VerSeq, req)
531
	resp := mp.readDirLimit(req)
532
	reply, err := json.Marshal(resp)
533
	if err != nil {
534
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
535
		return
536
	}
537
	p.PacketOkWithBody(reply)
538
	return
539
}
540

541
// Lookup looks up the given dentry from the request.
542
func (mp *metaPartition) Lookup(req *LookupReq, p *Packet) (err error) {
543
	dentry := &Dentry{
544
		ParentId: req.ParentID,
545
		Name:     req.Name,
546
	}
547
	dentry.setVerSeq(req.VerSeq)
548
	var denList []proto.DetryInfo
549
	if req.VerAll {
550
		denList = mp.getDentryList(dentry)
551
	}
552
	dentry, status := mp.getDentry(dentry)
553

554
	var reply []byte
555
	if status == proto.OpOk || req.VerAll {
556
		var resp *LookupResp
557
		if status == proto.OpOk {
558
			resp = &LookupResp{
559
				Inode:  dentry.Inode,
560
				Mode:   dentry.Type,
561
				VerSeq: dentry.getSeqFiled(),
562
				LayAll: denList,
563
			}
564
		} else {
565
			resp = &LookupResp{
566
				Inode:  0,
567
				Mode:   0,
568
				VerSeq: 0,
569
				LayAll: denList,
570
			}
571
		}
572
		reply, err = json.Marshal(resp)
573
		if err != nil {
574
			status = proto.OpErr
575
			reply = []byte(err.Error())
576
		}
577
	}
578

579
	p.PacketErrorWithBody(status, reply)
580
	return
581
}
582

583
// GetDentryTree returns the dentry tree stored in the meta partition.
584
func (mp *metaPartition) GetDentryTree() *BTree {
585
	return mp.dentryTree.GetTree()
586
}
587

588
// GetDentryTreeLen returns the dentry tree length.
589
func (mp *metaPartition) GetDentryTreeLen() int {
590
	if mp.dentryTree == nil {
591
		return 0
592
	}
593
	return mp.dentryTree.Len()
594
}
595

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.