1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.k
23
"github.com/stretchr/testify/assert"
25
"github.com/cubefs/cubefs/proto"
26
"github.com/cubefs/cubefs/util/log"
29
// var manager = &metadataManager{}
37
const FileModeType uint32 = 420
40
MemberAddrs = "127.0.0.1:17210,127.0.0.2:17210,127.0.0.3:17210"
49
log.InitLog("/tmp/cfs/logs/", "test", log.DebugLevel, nil, log.DefaultLogLeftSpaceLimit)
52
func newMetaPartition(PartitionId uint64, manager *metadataManager) (mp *metaPartition) {
53
metaConf := &MetaPartitionConfig{
54
PartitionId: PartitionId,
56
PartitionType: proto.VolumeTypeHot,
61
dentryTree: NewBtree(),
62
inodeTree: NewBtree(),
63
extendTree: NewBtree(),
64
multipartTree: NewBtree(),
65
stopC: make(chan bool),
66
storeChan: make(chan *storeMsg, 100),
67
freeList: newFreeList(),
68
extDelCh: make(chan []proto.ExtentKey, defaultDelExtentsCnt),
69
extReset: make(chan struct{}),
73
mp.config.Cursor = 1000
74
mp.config.End = 100000
76
mp.txProcessor = NewTransactionProcessor(mp)
77
mp.uidManager = NewUidMgr(mp.config.VolName, mp.config.PartitionId)
81
func initMps(t *testing.T) {
83
mp1 = newMetaPartition(10001, &metadataManager{})
84
mp2 = newMetaPartition(10002, &metadataManager{})
85
mp3 = newMetaPartition(10003, &metadataManager{})
88
func (i *Inode) Equal(inode *Inode) bool {
89
return reflect.DeepEqual(i, inode)
92
func (i *TxRollbackInode) Equal(txRbInode *TxRollbackInode) bool {
93
if i.rbType != txRbInode.rbType {
96
if !i.inode.Equal(txRbInode.inode) {
99
if !reflect.DeepEqual(i.txInodeInfo, txRbInode.txInodeInfo) {
105
func TestRollbackInodeLess(t *testing.T) {
106
inode := NewInode(101, 0)
107
txInodeInfo := proto.NewTxInodeInfo(MemberAddrs, inodeNum, 10001)
108
rbInode := NewTxRollbackInode(inode, []uint32{}, txInodeInfo, TxAdd)
110
rbInode2 := &TxRollbackInode{
111
inode: NewInode(100, 0),
113
assert.False(t, rbInode.Less(rbInode2))
115
rbInode2.txInodeInfo = proto.NewTxInodeInfo("", inodeNum+1, 0)
116
assert.True(t, rbInode.Less(rbInode2))
119
func TestRollbackInodeSerialization(t *testing.T) {
130
LinkTarget: []byte("link target"),
134
Extents: NewSortedExtentsFromEks([]proto.ExtentKey{
135
{FileOffset: 11, PartitionId: 12, ExtentId: 13, ExtentOffset: 0, Size: 0, CRC: 0},
137
ObjExtents: NewSortedObjExtents(),
140
ids := []uint32{11, 13}
142
txInodeInfo := proto.NewTxInodeInfo(MemberAddrs, inodeNum, 10001)
143
rbInode := NewTxRollbackInode(inode, ids, txInodeInfo, TxAdd)
145
data, _ = rbInode.Marshal()
147
txRbInode := NewTxRollbackInode(nil, []uint32{}, nil, 0)
148
txRbInode.Unmarshal(data)
150
assert.True(t, rbInode.Equal(txRbInode))
153
assert.False(t, rbInode.Equal(txRbInode))
155
cpRbInode := rbInode.Copy()
156
assert.True(t, rbInode.Equal(cpRbInode.(*TxRollbackInode)))
159
func TestTxRollbackDentry_Less(t *testing.T) {
160
rb1 := &TxRollbackDentry{
161
txDentryInfo: &proto.TxDentryInfo{ParentId: 1001, Name: "tt"},
164
rb2 := &TxRollbackDentry{
165
txDentryInfo: &proto.TxDentryInfo{ParentId: 1002, Name: "tt"},
168
assert.True(t, rb1.Less(rb2))
170
rb3 := &TxRollbackDentry{
171
txDentryInfo: &proto.TxDentryInfo{ParentId: 1001, Name: "ta"},
173
assert.False(t, rb1.Less(rb3))
176
func TestRollbackDentrySerialization(t *testing.T) {
177
txDentryInfo := proto.NewTxDentryInfo(MemberAddrs, pInodeNum, dentryName, 10001)
184
rbDentry := NewTxRollbackDentry(dentry, txDentryInfo, TxAdd)
186
data, _ = rbDentry.Marshal()
188
txRbDentry := NewTxRollbackDentry(nil, nil, 0)
189
txRbDentry.Unmarshal(data)
191
assert.True(t, reflect.DeepEqual(rbDentry.dentry, txRbDentry.dentry))
192
assert.True(t, reflect.DeepEqual(rbDentry.txDentryInfo, txRbDentry.txDentryInfo))
193
assert.True(t, reflect.DeepEqual(rbDentry, txRbDentry))
195
txDentryInfo.MpMembers = "tttt"
196
assert.False(t, reflect.DeepEqual(rbDentry, txRbDentry))
198
cpDentryInfo := rbDentry.Copy()
199
assert.True(t, reflect.DeepEqual(rbDentry, cpDentryInfo.(*TxRollbackDentry)))
202
func TestNextTxID(t *testing.T) {
204
txMgr := mp1.txProcessor.txManager
207
expectedId := fmt.Sprintf("%d_%d", mp1.config.PartitionId, id+1)
208
txMgr.txIdAlloc.setTransactionID(id)
209
assert.Equal(t, expectedId, txMgr.nextTxID())
212
func TestTxMgrOp(t *testing.T) {
214
txInfo := proto.NewTransactionInfo(5, proto.TxTypeCreate)
215
assert.True(t, txInfo.State == proto.TxStateInit)
217
txDentryInfo := proto.NewTxDentryInfo(MemberAddrs, pInodeNum, dentryName, 10001)
218
txInfo.TxDentryInfos[txDentryInfo.GetKey()] = txDentryInfo
219
if !txInfo.IsInitialized() {
220
mp1.initTxInfo(txInfo)
223
assert.True(t, txInfo.State == proto.TxStatePreCommit)
226
txMgr := mp1.txProcessor.txManager
229
id := txMgr.txIdAlloc.getTransactionID()
230
expectedId := fmt.Sprintf("%d_%d", mp1.config.PartitionId, id)
231
assert.Equal(t, expectedId, txId)
232
txMgr.registerTransaction(txInfo)
235
gotTxInfo := txMgr.getTransaction(txId)
236
assert.Equal(t, txInfo, gotTxInfo)
239
txMgr.rollbackTxInfo(txId)
240
gotTxInfo = txMgr.getTransaction(txId)
241
assert.True(t, gotTxInfo.IsDone())
244
status, _ := txMgr.commitTxInfo("dummy_txId")
245
assert.Equal(t, proto.OpTxInfoNotExistErr, status)
248
func TestTxRscOp(t *testing.T) {
250
txMgr := mp1.txProcessor.txManager
253
txInodeInfo1 := proto.NewTxInodeInfo(MemberAddrs, inodeNum, 10001)
254
txInodeInfo1.TxID = txMgr.nextTxID()
255
txInodeInfo1.Timeout = 5
256
txInodeInfo1.CreateTime = time.Now().UnixNano()
257
inode1 := NewInode(inodeNum, FileModeType)
258
rbInode1 := NewTxRollbackInode(inode1, []uint32{}, txInodeInfo1, TxAdd)
260
txInodeInfo2 := proto.NewTxInodeInfo(MemberAddrs, inodeNum, 10001)
261
txInodeInfo2.TxID = txMgr.nextTxID()
262
txInodeInfo2.Timeout = 5
263
txInodeInfo2.CreateTime = time.Now().UnixNano()
264
rbInode2 := NewTxRollbackInode(inode1, []uint32{}, txInodeInfo2, TxAdd)
266
txRsc := mp1.txProcessor.txResource
267
status := txRsc.addTxRollbackInode(rbInode1)
268
assert.Equal(t, proto.OpOk, status)
269
status = txRsc.addTxRollbackInode(rbInode1)
270
assert.Equal(t, proto.OpExistErr, status)
272
inTx, _ := txRsc.isInodeInTransction(inode1)
275
status = txRsc.addTxRollbackInode(rbInode2)
276
assert.Equal(t, proto.OpTxConflictErr, status)
279
txDentryInfo1 := proto.NewTxDentryInfo(MemberAddrs, pInodeNum, dentryName, 10001)
286
txDentryInfo1.TxID = txMgr.nextTxID()
287
txDentryInfo1.Timeout = 5
288
txDentryInfo1.CreateTime = time.Now().Unix()
289
rbDentry1 := NewTxRollbackDentry(dentry, txDentryInfo1, TxAdd)
291
txDentryInfo2 := proto.NewTxDentryInfo(MemberAddrs, pInodeNum, dentryName, 10001)
292
txDentryInfo2.TxID = txMgr.nextTxID()
293
txDentryInfo2.Timeout = 5
294
txDentryInfo2.CreateTime = time.Now().Unix()
295
rbDentry2 := NewTxRollbackDentry(dentry, txDentryInfo2, TxAdd)
297
status = txRsc.addTxRollbackDentry(rbDentry1)
298
assert.Equal(t, proto.OpOk, status)
299
status = txRsc.addTxRollbackDentry(rbDentry1)
300
assert.Equal(t, proto.OpExistErr, status)
302
inTx, _ = txRsc.isDentryInTransction(dentry)
305
status = txRsc.addTxRollbackDentry(rbDentry2)
306
assert.Equal(t, proto.OpTxConflictErr, status)
309
func mockAddTxInode(mp *metaPartition) *TxRollbackInode {
310
txMgr := mp.txProcessor.txManager
311
txInodeInfo1 := proto.NewTxInodeInfo(MemberAddrs, inodeNum, 10001)
312
txInodeInfo1.TxID = txMgr.nextTxID()
313
txInodeInfo1.Timeout = 5
314
txInodeInfo1.CreateTime = time.Now().UnixNano()
315
inode1 := NewInode(inodeNum, FileModeType)
316
rbInode := NewTxRollbackInode(inode1, []uint32{}, txInodeInfo1, TxDelete)
317
txRsc := mp.txProcessor.txResource
318
txRsc.addTxRollbackInode(rbInode)
320
mp.inodeTree.ReplaceOrInsert(inode1, true)
324
func mockDeleteTxInode(mp *metaPartition) *TxRollbackInode {
325
inode2 := NewInode(inodeNum2, FileModeType)
326
mp.inodeTree.ReplaceOrInsert(inode2, true)
328
txMgr := mp.txProcessor.txManager
329
txInodeInfo2 := proto.NewTxInodeInfo(MemberAddrs, inodeNum2, 10001)
330
txInodeInfo2.TxID = txMgr.nextTxID()
331
txInodeInfo2.Timeout = 5
332
txInodeInfo2.CreateTime = time.Now().UnixNano()
333
rbInode := NewTxRollbackInode(inode2, []uint32{}, txInodeInfo2, TxAdd)
334
txRsc := mp.txProcessor.txResource
335
txRsc.addTxRollbackInode(rbInode)
337
mp.inodeTree.Delete(inode2)
341
//func mockUpdateTxInode(mp *metaPartition) *TxRollbackInode {
342
// inode3 := NewInode(inodeNum3, FileModeType)
343
// oldInode, ok := mp.inodeTree.ReplaceOrInsert(inode3, true)
345
// txMgr := mp.txProcessor.txManager
346
// txInodeInfo3 := proto.NewTxInodeInfo(MemberAddrs, inodeNum3, 10001)
347
// txInodeInfo3.TxID = txMgr.nextTxID()
348
// rbInode := NewTxRollbackInode(inode3, txInodeInfo3, TxUpdate)
351
func mockAddTxDentry(mp *metaPartition) *TxRollbackDentry {
352
txMgr := mp.txProcessor.txManager
353
txDentryInfo1 := proto.NewTxDentryInfo(MemberAddrs, pInodeNum, dentryName, 10001)
354
txDentryInfo1.TxID = txMgr.nextTxID()
355
txDentryInfo1.Timeout = 5
356
txDentryInfo1.CreateTime = time.Now().Unix()
363
rbDentry := NewTxRollbackDentry(dentry1, txDentryInfo1, TxDelete)
364
txRsc := mp.txProcessor.txResource
365
txRsc.addTxRollbackDentry(rbDentry)
367
mp.dentryTree.ReplaceOrInsert(dentry1, true)
371
func mockDeleteTxDentry(mp *metaPartition) *TxRollbackDentry {
378
mp.dentryTree.ReplaceOrInsert(dentry2, true)
380
txMgr := mp.txProcessor.txManager
381
txDentryInfo2 := proto.NewTxDentryInfo(MemberAddrs, pInodeNum, dentryName, 10001)
382
txDentryInfo2.TxID = txMgr.nextTxID()
383
txDentryInfo2.Timeout = 5
384
txDentryInfo2.CreateTime = time.Now().Unix()
385
rbDentry := NewTxRollbackDentry(dentry2, txDentryInfo2, TxAdd)
386
txRsc := mp.txProcessor.txResource
387
txRsc.addTxRollbackDentry(rbDentry)
389
mp.dentryTree.Delete(dentry2)
393
func TestTxRscRollback(t *testing.T) {
395
// roll back add inode
396
rbInode1 := mockAddTxInode(mp1)
397
txRsc := mp1.txProcessor.txResource
398
req1 := &proto.TxInodeApplyRequest{
399
TxID: rbInode1.txInodeInfo.TxID,
400
Inode: rbInode1.inode.Inode,
402
status, err := txRsc.rollbackInode(req1)
403
assert.True(t, status == proto.OpOk && err == nil)
405
// roll back delete inode
406
rbInode2 := mockDeleteTxInode(mp1)
407
req2 := &proto.TxInodeApplyRequest{
408
TxID: rbInode2.txInodeInfo.TxID,
409
Inode: rbInode2.inode.Inode,
411
status, err = txRsc.rollbackInode(req2)
412
assert.True(t, status == proto.OpOk && err == nil)
414
// roll back add dentry
415
rbDentry1 := mockAddTxDentry(mp1)
416
req3 := &proto.TxDentryApplyRequest{
417
TxID: rbDentry1.txDentryInfo.TxID,
418
Pid: rbDentry1.txDentryInfo.ParentId,
419
Name: rbDentry1.txDentryInfo.Name,
421
status, err = txRsc.rollbackDentry(req3)
422
assert.True(t, status == proto.OpOk && err == nil)
424
// roll back delete dentry
425
rbDentry2 := mockDeleteTxDentry(mp1)
426
req4 := &proto.TxDentryApplyRequest{
427
TxID: rbDentry2.txDentryInfo.TxID,
428
Pid: rbDentry2.txDentryInfo.ParentId,
429
Name: rbDentry2.txDentryInfo.Name,
431
status, err = txRsc.rollbackDentry(req4)
432
assert.True(t, status == proto.OpOk && err == nil)
435
func TestTxRscCommit(t *testing.T) {
438
rbInode1 := mockAddTxInode(mp1)
439
txRsc := mp1.txProcessor.txResource
440
status, err := txRsc.commitInode(rbInode1.txInodeInfo.TxID, rbInode1.inode.Inode)
441
assert.True(t, status == proto.OpOk && err == nil)
443
// commit delete inode
444
rbInode2 := mockDeleteTxInode(mp1)
445
status, err = txRsc.commitInode(rbInode2.txInodeInfo.TxID, rbInode2.inode.Inode)
446
assert.True(t, status == proto.OpOk && err == nil)
449
rbDentry1 := mockAddTxDentry(mp1)
450
status, err = txRsc.commitDentry(rbDentry1.txDentryInfo.TxID, rbDentry1.txDentryInfo.ParentId, rbDentry1.txDentryInfo.Name)
451
assert.True(t, status == proto.OpOk && err == nil)
453
// commit delete dentry
454
rbDentry2 := mockDeleteTxDentry(mp1)
455
status, err = txRsc.commitDentry(rbDentry2.txDentryInfo.TxID, rbDentry2.txDentryInfo.ParentId, rbDentry2.txDentryInfo.Name)
456
assert.True(t, status == proto.OpOk && err == nil)
459
func TestTxTreeRollback(t *testing.T) {
462
txInfo := proto.NewTransactionInfo(0, proto.TxTypeCreate)
463
txDentryInfo := proto.NewTxDentryInfo(MemberAddrs, pInodeNum+1, dentryName, 10001)
464
txInfo.TxDentryInfos[txDentryInfo.GetKey()] = txDentryInfo
465
if !txInfo.IsInitialized() {
466
mp1.initTxInfo(txInfo)
470
txInfo.TmID = int64(mp1.config.PartitionId)
471
txMgr := mp1.txProcessor.txManager
474
id := txMgr.txIdAlloc.getTransactionID()
475
expectedId := fmt.Sprintf("%d_%d", mp1.config.PartitionId, id)
476
assert.Equal(t, expectedId, txId)
477
txMgr.registerTransaction(txInfo)
479
txMgr.registerTransaction(txInfo)
480
txMgr.txProcessor.mask |= proto.TxPause
481
time.Sleep(2 * time.Second)
482
assert.True(t, txMgr.txTree.Len() == 1)
485
func TestCheckTxLimit(t *testing.T) {
487
txMgr := mp1.txProcessor.txManager
490
txMgr.opLimiter.SetBurst(1)
491
txInfo := proto.NewTransactionInfo(0, proto.TxTypeCreate)
492
txDentryInfo := proto.NewTxDentryInfo(MemberAddrs, pInodeNum, dentryName, 10001)
493
txInfo.TxDentryInfos[txDentryInfo.GetKey()] = txDentryInfo
494
err := mp1.initTxInfo(txInfo)
495
assert.NoError(t, err)
497
err = mp1.initTxInfo(txInfo)
501
func TestGetTxHandler(t *testing.T) {
503
txMgr := mp1.txProcessor.txManager
506
txInfo := proto.NewTransactionInfo(0, proto.TxTypeCreate)
507
txDentryInfo := proto.NewTxDentryInfo(MemberAddrs, pInodeNum, dentryName, 10001)
508
txInfo.TxDentryInfos[txDentryInfo.GetKey()] = txDentryInfo
509
if !txInfo.IsInitialized() {
510
mp1.initTxInfo(txInfo)
514
txMgr.registerTransaction(txInfo)
516
req = &proto.TxGetInfoRequest{
518
Pid: mp1.config.PartitionId,
523
assert.True(t, mp1.TxGetInfo(req, p) == nil)
524
assert.True(t, p.ResultCode == proto.OpOk)