cubefs

Форк
0
/
transaction_test.go 
525 строк · 14.9 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.k
14

15
package metanode
16

17
import (
18
	"fmt"
19
	"reflect"
20
	"testing"
21
	"time"
22

23
	"github.com/stretchr/testify/assert"
24

25
	"github.com/cubefs/cubefs/proto"
26
	"github.com/cubefs/cubefs/util/log"
27
)
28

29
// var manager = &metadataManager{}
30
var mp1 *metaPartition
31

32
var (
33
	mp2 *metaPartition
34
	mp3 *metaPartition
35
)
36

37
const FileModeType uint32 = 420
38

39
const (
40
	MemberAddrs = "127.0.0.1:17210,127.0.0.2:17210,127.0.0.3:17210"
41
	inodeNum    = 1001
42
	pInodeNum   = 1002
43
	inodeNum2   = 1003
44
	inodeNum3   = 1004
45
	dentryName  = "parent"
46
)
47

48
func init() {
49
	log.InitLog("/tmp/cfs/logs/", "test", log.DebugLevel, nil, log.DefaultLogLeftSpaceLimit)
50
}
51

52
func newMetaPartition(PartitionId uint64, manager *metadataManager) (mp *metaPartition) {
53
	metaConf := &MetaPartitionConfig{
54
		PartitionId:   PartitionId,
55
		VolName:       "testVol",
56
		PartitionType: proto.VolumeTypeHot,
57
	}
58

59
	mp = &metaPartition{
60
		config:        metaConf,
61
		dentryTree:    NewBtree(),
62
		inodeTree:     NewBtree(),
63
		extendTree:    NewBtree(),
64
		multipartTree: NewBtree(),
65
		stopC:         make(chan bool),
66
		storeChan:     make(chan *storeMsg, 100),
67
		freeList:      newFreeList(),
68
		extDelCh:      make(chan []proto.ExtentKey, defaultDelExtentsCnt),
69
		extReset:      make(chan struct{}),
70
		vol:           NewVol(),
71
		manager:       manager,
72
	}
73
	mp.config.Cursor = 1000
74
	mp.config.End = 100000
75

76
	mp.txProcessor = NewTransactionProcessor(mp)
77
	mp.uidManager = NewUidMgr(mp.config.VolName, mp.config.PartitionId)
78
	return mp
79
}
80

81
func initMps(t *testing.T) {
82
	test = true
83
	mp1 = newMetaPartition(10001, &metadataManager{})
84
	mp2 = newMetaPartition(10002, &metadataManager{})
85
	mp3 = newMetaPartition(10003, &metadataManager{})
86
}
87

88
func (i *Inode) Equal(inode *Inode) bool {
89
	return reflect.DeepEqual(i, inode)
90
}
91

92
func (i *TxRollbackInode) Equal(txRbInode *TxRollbackInode) bool {
93
	if i.rbType != txRbInode.rbType {
94
		return false
95
	}
96
	if !i.inode.Equal(txRbInode.inode) {
97
		return false
98
	}
99
	if !reflect.DeepEqual(i.txInodeInfo, txRbInode.txInodeInfo) {
100
		return false
101
	}
102
	return true
103
}
104

105
func TestRollbackInodeLess(t *testing.T) {
106
	inode := NewInode(101, 0)
107
	txInodeInfo := proto.NewTxInodeInfo(MemberAddrs, inodeNum, 10001)
108
	rbInode := NewTxRollbackInode(inode, []uint32{}, txInodeInfo, TxAdd)
109

110
	rbInode2 := &TxRollbackInode{
111
		inode: NewInode(100, 0),
112
	}
113
	assert.False(t, rbInode.Less(rbInode2))
114

115
	rbInode2.txInodeInfo = proto.NewTxInodeInfo("", inodeNum+1, 0)
116
	assert.True(t, rbInode.Less(rbInode2))
117
}
118

119
func TestRollbackInodeSerialization(t *testing.T) {
120
	inode := &Inode{
121
		Inode:      1024,
122
		Gid:        11,
123
		Uid:        10,
124
		Size:       101,
125
		Type:       0o755,
126
		Generation: 13,
127
		CreateTime: 102,
128
		AccessTime: 104,
129
		ModifyTime: 107,
130
		LinkTarget: []byte("link target"),
131
		NLink:      7,
132
		Flag:       1,
133
		Reserved:   3,
134
		Extents: NewSortedExtentsFromEks([]proto.ExtentKey{
135
			{FileOffset: 11, PartitionId: 12, ExtentId: 13, ExtentOffset: 0, Size: 0, CRC: 0},
136
		}),
137
		ObjExtents: NewSortedObjExtents(),
138
	}
139

140
	ids := []uint32{11, 13}
141

142
	txInodeInfo := proto.NewTxInodeInfo(MemberAddrs, inodeNum, 10001)
143
	rbInode := NewTxRollbackInode(inode, ids, txInodeInfo, TxAdd)
144
	var data []byte
145
	data, _ = rbInode.Marshal()
146

147
	txRbInode := NewTxRollbackInode(nil, []uint32{}, nil, 0)
148
	txRbInode.Unmarshal(data)
149

150
	assert.True(t, rbInode.Equal(txRbInode))
151

152
	inode.Inode = 1023
153
	assert.False(t, rbInode.Equal(txRbInode))
154

155
	cpRbInode := rbInode.Copy()
156
	assert.True(t, rbInode.Equal(cpRbInode.(*TxRollbackInode)))
157
}
158

159
func TestTxRollbackDentry_Less(t *testing.T) {
160
	rb1 := &TxRollbackDentry{
161
		txDentryInfo: &proto.TxDentryInfo{ParentId: 1001, Name: "tt"},
162
	}
163

164
	rb2 := &TxRollbackDentry{
165
		txDentryInfo: &proto.TxDentryInfo{ParentId: 1002, Name: "tt"},
166
	}
167

168
	assert.True(t, rb1.Less(rb2))
169

170
	rb3 := &TxRollbackDentry{
171
		txDentryInfo: &proto.TxDentryInfo{ParentId: 1001, Name: "ta"},
172
	}
173
	assert.False(t, rb1.Less(rb3))
174
}
175

176
func TestRollbackDentrySerialization(t *testing.T) {
177
	txDentryInfo := proto.NewTxDentryInfo(MemberAddrs, pInodeNum, dentryName, 10001)
178
	dentry := &Dentry{
179
		ParentId: pInodeNum,
180
		Name:     dentryName,
181
		Inode:    inodeNum,
182
		Type:     FileModeType,
183
	}
184
	rbDentry := NewTxRollbackDentry(dentry, txDentryInfo, TxAdd)
185
	var data []byte
186
	data, _ = rbDentry.Marshal()
187

188
	txRbDentry := NewTxRollbackDentry(nil, nil, 0)
189
	txRbDentry.Unmarshal(data)
190

191
	assert.True(t, reflect.DeepEqual(rbDentry.dentry, txRbDentry.dentry))
192
	assert.True(t, reflect.DeepEqual(rbDentry.txDentryInfo, txRbDentry.txDentryInfo))
193
	assert.True(t, reflect.DeepEqual(rbDentry, txRbDentry))
194

195
	txDentryInfo.MpMembers = "tttt"
196
	assert.False(t, reflect.DeepEqual(rbDentry, txRbDentry))
197

198
	cpDentryInfo := rbDentry.Copy()
199
	assert.True(t, reflect.DeepEqual(rbDentry, cpDentryInfo.(*TxRollbackDentry)))
200
}
201

202
func TestNextTxID(t *testing.T) {
203
	initMps(t)
204
	txMgr := mp1.txProcessor.txManager
205

206
	var id uint64 = 2
207
	expectedId := fmt.Sprintf("%d_%d", mp1.config.PartitionId, id+1)
208
	txMgr.txIdAlloc.setTransactionID(id)
209
	assert.Equal(t, expectedId, txMgr.nextTxID())
210
}
211

212
func TestTxMgrOp(t *testing.T) {
213
	initMps(t)
214
	txInfo := proto.NewTransactionInfo(5, proto.TxTypeCreate)
215
	assert.True(t, txInfo.State == proto.TxStateInit)
216

217
	txDentryInfo := proto.NewTxDentryInfo(MemberAddrs, pInodeNum, dentryName, 10001)
218
	txInfo.TxDentryInfos[txDentryInfo.GetKey()] = txDentryInfo
219
	if !txInfo.IsInitialized() {
220
		mp1.initTxInfo(txInfo)
221
	}
222

223
	assert.True(t, txInfo.State == proto.TxStatePreCommit)
224

225
	txId := txInfo.TxID
226
	txMgr := mp1.txProcessor.txManager
227

228
	// register
229
	id := txMgr.txIdAlloc.getTransactionID()
230
	expectedId := fmt.Sprintf("%d_%d", mp1.config.PartitionId, id)
231
	assert.Equal(t, expectedId, txId)
232
	txMgr.registerTransaction(txInfo)
233

234
	// get
235
	gotTxInfo := txMgr.getTransaction(txId)
236
	assert.Equal(t, txInfo, gotTxInfo)
237

238
	// rollback
239
	txMgr.rollbackTxInfo(txId)
240
	gotTxInfo = txMgr.getTransaction(txId)
241
	assert.True(t, gotTxInfo.IsDone())
242

243
	// commit
244
	status, _ := txMgr.commitTxInfo("dummy_txId")
245
	assert.Equal(t, proto.OpTxInfoNotExistErr, status)
246
}
247

248
func TestTxRscOp(t *testing.T) {
249
	initMps(t)
250
	txMgr := mp1.txProcessor.txManager
251

252
	// rbInode
253
	txInodeInfo1 := proto.NewTxInodeInfo(MemberAddrs, inodeNum, 10001)
254
	txInodeInfo1.TxID = txMgr.nextTxID()
255
	txInodeInfo1.Timeout = 5
256
	txInodeInfo1.CreateTime = time.Now().UnixNano()
257
	inode1 := NewInode(inodeNum, FileModeType)
258
	rbInode1 := NewTxRollbackInode(inode1, []uint32{}, txInodeInfo1, TxAdd)
259

260
	txInodeInfo2 := proto.NewTxInodeInfo(MemberAddrs, inodeNum, 10001)
261
	txInodeInfo2.TxID = txMgr.nextTxID()
262
	txInodeInfo2.Timeout = 5
263
	txInodeInfo2.CreateTime = time.Now().UnixNano()
264
	rbInode2 := NewTxRollbackInode(inode1, []uint32{}, txInodeInfo2, TxAdd)
265

266
	txRsc := mp1.txProcessor.txResource
267
	status := txRsc.addTxRollbackInode(rbInode1)
268
	assert.Equal(t, proto.OpOk, status)
269
	status = txRsc.addTxRollbackInode(rbInode1)
270
	assert.Equal(t, proto.OpExistErr, status)
271

272
	inTx, _ := txRsc.isInodeInTransction(inode1)
273
	assert.True(t, inTx)
274

275
	status = txRsc.addTxRollbackInode(rbInode2)
276
	assert.Equal(t, proto.OpTxConflictErr, status)
277

278
	// rbDentry
279
	txDentryInfo1 := proto.NewTxDentryInfo(MemberAddrs, pInodeNum, dentryName, 10001)
280
	dentry := &Dentry{
281
		ParentId: pInodeNum,
282
		Name:     dentryName,
283
		Inode:    inodeNum,
284
		Type:     FileModeType,
285
	}
286
	txDentryInfo1.TxID = txMgr.nextTxID()
287
	txDentryInfo1.Timeout = 5
288
	txDentryInfo1.CreateTime = time.Now().Unix()
289
	rbDentry1 := NewTxRollbackDentry(dentry, txDentryInfo1, TxAdd)
290

291
	txDentryInfo2 := proto.NewTxDentryInfo(MemberAddrs, pInodeNum, dentryName, 10001)
292
	txDentryInfo2.TxID = txMgr.nextTxID()
293
	txDentryInfo2.Timeout = 5
294
	txDentryInfo2.CreateTime = time.Now().Unix()
295
	rbDentry2 := NewTxRollbackDentry(dentry, txDentryInfo2, TxAdd)
296

297
	status = txRsc.addTxRollbackDentry(rbDentry1)
298
	assert.Equal(t, proto.OpOk, status)
299
	status = txRsc.addTxRollbackDentry(rbDentry1)
300
	assert.Equal(t, proto.OpExistErr, status)
301

302
	inTx, _ = txRsc.isDentryInTransction(dentry)
303
	assert.True(t, inTx)
304

305
	status = txRsc.addTxRollbackDentry(rbDentry2)
306
	assert.Equal(t, proto.OpTxConflictErr, status)
307
}
308

309
func mockAddTxInode(mp *metaPartition) *TxRollbackInode {
310
	txMgr := mp.txProcessor.txManager
311
	txInodeInfo1 := proto.NewTxInodeInfo(MemberAddrs, inodeNum, 10001)
312
	txInodeInfo1.TxID = txMgr.nextTxID()
313
	txInodeInfo1.Timeout = 5
314
	txInodeInfo1.CreateTime = time.Now().UnixNano()
315
	inode1 := NewInode(inodeNum, FileModeType)
316
	rbInode := NewTxRollbackInode(inode1, []uint32{}, txInodeInfo1, TxDelete)
317
	txRsc := mp.txProcessor.txResource
318
	txRsc.addTxRollbackInode(rbInode)
319

320
	mp.inodeTree.ReplaceOrInsert(inode1, true)
321
	return rbInode
322
}
323

324
func mockDeleteTxInode(mp *metaPartition) *TxRollbackInode {
325
	inode2 := NewInode(inodeNum2, FileModeType)
326
	mp.inodeTree.ReplaceOrInsert(inode2, true)
327

328
	txMgr := mp.txProcessor.txManager
329
	txInodeInfo2 := proto.NewTxInodeInfo(MemberAddrs, inodeNum2, 10001)
330
	txInodeInfo2.TxID = txMgr.nextTxID()
331
	txInodeInfo2.Timeout = 5
332
	txInodeInfo2.CreateTime = time.Now().UnixNano()
333
	rbInode := NewTxRollbackInode(inode2, []uint32{}, txInodeInfo2, TxAdd)
334
	txRsc := mp.txProcessor.txResource
335
	txRsc.addTxRollbackInode(rbInode)
336

337
	mp.inodeTree.Delete(inode2)
338
	return rbInode
339
}
340

341
//func mockUpdateTxInode(mp *metaPartition) *TxRollbackInode {
342
//	inode3 := NewInode(inodeNum3, FileModeType)
343
//	oldInode, ok := mp.inodeTree.ReplaceOrInsert(inode3, true)
344
//
345
//	txMgr := mp.txProcessor.txManager
346
//	txInodeInfo3 := proto.NewTxInodeInfo(MemberAddrs, inodeNum3, 10001)
347
//	txInodeInfo3.TxID = txMgr.nextTxID()
348
//	rbInode := NewTxRollbackInode(inode3, txInodeInfo3, TxUpdate)
349
//}
350

351
func mockAddTxDentry(mp *metaPartition) *TxRollbackDentry {
352
	txMgr := mp.txProcessor.txManager
353
	txDentryInfo1 := proto.NewTxDentryInfo(MemberAddrs, pInodeNum, dentryName, 10001)
354
	txDentryInfo1.TxID = txMgr.nextTxID()
355
	txDentryInfo1.Timeout = 5
356
	txDentryInfo1.CreateTime = time.Now().Unix()
357
	dentry1 := &Dentry{
358
		ParentId: pInodeNum,
359
		Name:     dentryName,
360
		Inode:    1001,
361
		Type:     0,
362
	}
363
	rbDentry := NewTxRollbackDentry(dentry1, txDentryInfo1, TxDelete)
364
	txRsc := mp.txProcessor.txResource
365
	txRsc.addTxRollbackDentry(rbDentry)
366

367
	mp.dentryTree.ReplaceOrInsert(dentry1, true)
368
	return rbDentry
369
}
370

371
func mockDeleteTxDentry(mp *metaPartition) *TxRollbackDentry {
372
	dentry2 := &Dentry{
373
		ParentId: pInodeNum,
374
		Name:     dentryName,
375
		Inode:    1001,
376
		Type:     0,
377
	}
378
	mp.dentryTree.ReplaceOrInsert(dentry2, true)
379

380
	txMgr := mp.txProcessor.txManager
381
	txDentryInfo2 := proto.NewTxDentryInfo(MemberAddrs, pInodeNum, dentryName, 10001)
382
	txDentryInfo2.TxID = txMgr.nextTxID()
383
	txDentryInfo2.Timeout = 5
384
	txDentryInfo2.CreateTime = time.Now().Unix()
385
	rbDentry := NewTxRollbackDentry(dentry2, txDentryInfo2, TxAdd)
386
	txRsc := mp.txProcessor.txResource
387
	txRsc.addTxRollbackDentry(rbDentry)
388

389
	mp.dentryTree.Delete(dentry2)
390
	return rbDentry
391
}
392

393
func TestTxRscRollback(t *testing.T) {
394
	initMps(t)
395
	// roll back add inode
396
	rbInode1 := mockAddTxInode(mp1)
397
	txRsc := mp1.txProcessor.txResource
398
	req1 := &proto.TxInodeApplyRequest{
399
		TxID:  rbInode1.txInodeInfo.TxID,
400
		Inode: rbInode1.inode.Inode,
401
	}
402
	status, err := txRsc.rollbackInode(req1)
403
	assert.True(t, status == proto.OpOk && err == nil)
404

405
	// roll back delete inode
406
	rbInode2 := mockDeleteTxInode(mp1)
407
	req2 := &proto.TxInodeApplyRequest{
408
		TxID:  rbInode2.txInodeInfo.TxID,
409
		Inode: rbInode2.inode.Inode,
410
	}
411
	status, err = txRsc.rollbackInode(req2)
412
	assert.True(t, status == proto.OpOk && err == nil)
413

414
	// roll back add dentry
415
	rbDentry1 := mockAddTxDentry(mp1)
416
	req3 := &proto.TxDentryApplyRequest{
417
		TxID: rbDentry1.txDentryInfo.TxID,
418
		Pid:  rbDentry1.txDentryInfo.ParentId,
419
		Name: rbDentry1.txDentryInfo.Name,
420
	}
421
	status, err = txRsc.rollbackDentry(req3)
422
	assert.True(t, status == proto.OpOk && err == nil)
423

424
	// roll back delete dentry
425
	rbDentry2 := mockDeleteTxDentry(mp1)
426
	req4 := &proto.TxDentryApplyRequest{
427
		TxID: rbDentry2.txDentryInfo.TxID,
428
		Pid:  rbDentry2.txDentryInfo.ParentId,
429
		Name: rbDentry2.txDentryInfo.Name,
430
	}
431
	status, err = txRsc.rollbackDentry(req4)
432
	assert.True(t, status == proto.OpOk && err == nil)
433
}
434

435
func TestTxRscCommit(t *testing.T) {
436
	initMps(t)
437
	// commit add inode
438
	rbInode1 := mockAddTxInode(mp1)
439
	txRsc := mp1.txProcessor.txResource
440
	status, err := txRsc.commitInode(rbInode1.txInodeInfo.TxID, rbInode1.inode.Inode)
441
	assert.True(t, status == proto.OpOk && err == nil)
442

443
	// commit delete inode
444
	rbInode2 := mockDeleteTxInode(mp1)
445
	status, err = txRsc.commitInode(rbInode2.txInodeInfo.TxID, rbInode2.inode.Inode)
446
	assert.True(t, status == proto.OpOk && err == nil)
447

448
	// commit add dentry
449
	rbDentry1 := mockAddTxDentry(mp1)
450
	status, err = txRsc.commitDentry(rbDentry1.txDentryInfo.TxID, rbDentry1.txDentryInfo.ParentId, rbDentry1.txDentryInfo.Name)
451
	assert.True(t, status == proto.OpOk && err == nil)
452

453
	// commit delete dentry
454
	rbDentry2 := mockDeleteTxDentry(mp1)
455
	status, err = txRsc.commitDentry(rbDentry2.txDentryInfo.TxID, rbDentry2.txDentryInfo.ParentId, rbDentry2.txDentryInfo.Name)
456
	assert.True(t, status == proto.OpOk && err == nil)
457
}
458

459
func TestTxTreeRollback(t *testing.T) {
460
	initMps(t)
461

462
	txInfo := proto.NewTransactionInfo(0, proto.TxTypeCreate)
463
	txDentryInfo := proto.NewTxDentryInfo(MemberAddrs, pInodeNum+1, dentryName, 10001)
464
	txInfo.TxDentryInfos[txDentryInfo.GetKey()] = txDentryInfo
465
	if !txInfo.IsInitialized() {
466
		mp1.initTxInfo(txInfo)
467
	}
468

469
	txId := txInfo.TxID
470
	txInfo.TmID = int64(mp1.config.PartitionId)
471
	txMgr := mp1.txProcessor.txManager
472

473
	// register
474
	id := txMgr.txIdAlloc.getTransactionID()
475
	expectedId := fmt.Sprintf("%d_%d", mp1.config.PartitionId, id)
476
	assert.Equal(t, expectedId, txId)
477
	txMgr.registerTransaction(txInfo)
478

479
	txMgr.registerTransaction(txInfo)
480
	txMgr.txProcessor.mask |= proto.TxPause
481
	time.Sleep(2 * time.Second)
482
	assert.True(t, txMgr.txTree.Len() == 1)
483
}
484

485
func TestCheckTxLimit(t *testing.T) {
486
	initMps(t)
487
	txMgr := mp1.txProcessor.txManager
488
	// txMgr.Start()
489
	txMgr.setLimit(10)
490
	txMgr.opLimiter.SetBurst(1)
491
	txInfo := proto.NewTransactionInfo(0, proto.TxTypeCreate)
492
	txDentryInfo := proto.NewTxDentryInfo(MemberAddrs, pInodeNum, dentryName, 10001)
493
	txInfo.TxDentryInfos[txDentryInfo.GetKey()] = txDentryInfo
494
	err := mp1.initTxInfo(txInfo)
495
	assert.NoError(t, err)
496

497
	err = mp1.initTxInfo(txInfo)
498
	assert.Error(t, err)
499
}
500

501
func TestGetTxHandler(t *testing.T) {
502
	initMps(t)
503
	txMgr := mp1.txProcessor.txManager
504
	// txMgr.Start()
505

506
	txInfo := proto.NewTransactionInfo(0, proto.TxTypeCreate)
507
	txDentryInfo := proto.NewTxDentryInfo(MemberAddrs, pInodeNum, dentryName, 10001)
508
	txInfo.TxDentryInfos[txDentryInfo.GetKey()] = txDentryInfo
509
	if !txInfo.IsInitialized() {
510
		mp1.initTxInfo(txInfo)
511
	}
512

513
	// register
514
	txMgr.registerTransaction(txInfo)
515
	var (
516
		req = &proto.TxGetInfoRequest{
517
			TxID: txInfo.TxID,
518
			Pid:  mp1.config.PartitionId,
519
		}
520
		p = new(Packet)
521
	)
522

523
	assert.True(t, mp1.TxGetInfo(req, p) == nil)
524
	assert.True(t, p.ResultCode == proto.OpOk)
525
}
526

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.