cubefs

Форк
0
/
transaction.go 
1566 строк · 42.3 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.k
14

15
package metanode
16

17
import (
18
	"bytes"
19
	"encoding/binary"
20
	"encoding/json"
21
	"errors"
22
	"fmt"
23
	"net"
24
	"strconv"
25
	"strings"
26
	"sync"
27
	"time"
28

29
	"golang.org/x/time/rate"
30

31
	"github.com/cubefs/cubefs/proto"
32
	"github.com/cubefs/cubefs/util"
33
	"github.com/cubefs/cubefs/util/btree"
34
	"github.com/cubefs/cubefs/util/log"
35
)
36

37
// Rollback Type
38
const (
39
	TxNoOp uint8 = iota
40
	TxUpdate
41
	TxDelete
42
	TxAdd
43
)
44

45
func (i *TxRollbackInode) ToString() string {
46
	content := fmt.Sprintf("{inode:[ino:%v, type:%v, nlink:%v], quotaIds:%v, rbType:%v"+
47
		"txInodeInfo:[Ino:%v, MpID:%v, CreateTime:%v, Timeout:%v, TxID:%v, MpMembers:%v]}",
48
		i.inode.Inode, i.inode.Type, i.inode.NLink, i.quotaIds, i.rbType, i.txInodeInfo.Ino, i.txInodeInfo.MpID,
49
		i.txInodeInfo.CreateTime, i.txInodeInfo.Timeout, i.txInodeInfo.TxID, i.txInodeInfo.MpMembers)
50
	return content
51
}
52

53
type TxRollbackInode struct {
54
	inode       *Inode
55
	txInodeInfo *proto.TxInodeInfo
56
	rbType      uint8 // Rollback Type
57
	quotaIds    []uint32
58
}
59

60
// Less tests whether the current TxRollbackInode item is less than the given one.
61
func (i *TxRollbackInode) Less(than btree.Item) bool {
62
	ti, ok := than.(*TxRollbackInode)
63
	if !ok {
64
		return false
65
	}
66

67
	if i.txInodeInfo != nil && ti.txInodeInfo != nil {
68
		return i.txInodeInfo.Ino < ti.txInodeInfo.Ino
69
	}
70

71
	return i.inode.Inode < ti.inode.Inode
72
}
73

74
// Copy returns a copy of the TxRollbackInode.
75
func (i *TxRollbackInode) Copy() btree.Item {
76
	item := i.inode.Copy()
77
	txInodeInfo := *i.txInodeInfo
78

79
	quotaIds := make([]uint32, len(i.quotaIds))
80
	copy(quotaIds, i.quotaIds)
81

82
	return &TxRollbackInode{
83
		inode:       item.(*Inode),
84
		quotaIds:    quotaIds,
85
		txInodeInfo: &txInodeInfo,
86
		rbType:      i.rbType,
87
	}
88
}
89

90
func (i *TxRollbackInode) Marshal() (result []byte, err error) {
91
	buff := bytes.NewBuffer(make([]byte, 0, 256))
92
	bs, err := i.inode.Marshal()
93
	if err != nil {
94
		return
95
	}
96
	if err = binary.Write(buff, binary.BigEndian, uint32(len(bs))); err != nil {
97
		return
98
	}
99
	if _, err = buff.Write(bs); err != nil {
100
		return
101
	}
102
	bs, err = i.txInodeInfo.Marshal()
103
	if err != nil {
104
		return
105
	}
106
	if err = binary.Write(buff, binary.BigEndian, uint32(len(bs))); err != nil {
107
		return nil, err
108
	}
109
	if _, err = buff.Write(bs); err != nil {
110
		return
111
	}
112
	if err = binary.Write(buff, binary.BigEndian, &i.rbType); err != nil {
113
		return
114
	}
115

116
	quotaBytes := bytes.NewBuffer(make([]byte, 0, 8))
117
	for _, quotaId := range i.quotaIds {
118
		if err = binary.Write(quotaBytes, binary.BigEndian, quotaId); err != nil {
119
			return
120
		}
121
	}
122

123
	_, err = buff.Write(quotaBytes.Bytes())
124
	return buff.Bytes(), err
125
}
126

127
func (i *TxRollbackInode) Unmarshal(raw []byte) (err error) {
128
	buff := bytes.NewBuffer(raw)
129
	var dataLen uint32
130
	if err = binary.Read(buff, binary.BigEndian, &dataLen); err != nil {
131
		return
132
	}
133
	data := make([]byte, int(dataLen))
134
	if _, err = buff.Read(data); err != nil {
135
		return
136
	}
137

138
	ino := NewInode(0, 0)
139
	if err = ino.Unmarshal(data); err != nil {
140
		return
141
	}
142
	i.inode = ino
143

144
	if err = binary.Read(buff, binary.BigEndian, &dataLen); err != nil {
145
		return
146
	}
147
	data = make([]byte, int(dataLen))
148
	if _, err = buff.Read(data); err != nil {
149
		return
150
	}
151

152
	txInodeInfo := proto.NewTxInodeInfo("", 0, 0)
153
	if err = txInodeInfo.Unmarshal(data); err != nil {
154
		return
155
	}
156
	i.txInodeInfo = txInodeInfo
157

158
	if err = binary.Read(buff, binary.BigEndian, &i.rbType); err != nil {
159
		return
160
	}
161

162
	var quotaId uint32
163
	for {
164
		if buff.Len() == 0 {
165
			break
166
		}
167
		if err = binary.Read(buff, binary.BigEndian, &quotaId); err != nil {
168
			return
169
		}
170
		i.quotaIds = append(i.quotaIds, quotaId)
171
	}
172
	return
173
}
174

175
func NewTxRollbackInode(inode *Inode, quotaIds []uint32, txInodeInfo *proto.TxInodeInfo, rbType uint8) *TxRollbackInode {
176
	return &TxRollbackInode{
177
		inode:       inode,
178
		quotaIds:    quotaIds,
179
		txInodeInfo: txInodeInfo,
180
		rbType:      rbType,
181
	}
182
}
183

184
type TxRollbackDentry struct {
185
	dentry       *Dentry
186
	txDentryInfo *proto.TxDentryInfo
187
	rbType       uint8 // Rollback Type `
188
}
189

190
func (d *TxRollbackDentry) ToString() string {
191
	content := fmt.Sprintf("{dentry:[ParentId:%v, Name:%v, Inode:%v, Type:%v], rbType:%v, "+
192
		"txDentryInfo:[ParentId:%v, Name:%v, MpMembers:%v, TxID:%v, MpID:%v, CreateTime:%v, Timeout:%v]}",
193
		d.dentry.ParentId, d.dentry.Name, d.dentry.Inode, d.dentry.Type, d.rbType, d.txDentryInfo.ParentId, d.txDentryInfo.Name,
194
		d.txDentryInfo.MpMembers, d.txDentryInfo.TxID, d.txDentryInfo.MpID, d.txDentryInfo.CreateTime, d.txDentryInfo.Timeout)
195
	return content
196
}
197

198
// Less tests whether the current TxRollbackDentry item is less than the given one.
199
func (d *TxRollbackDentry) Less(than btree.Item) bool {
200
	td, ok := than.(*TxRollbackDentry)
201
	return ok && d.txDentryInfo.GetKey() < td.txDentryInfo.GetKey()
202
}
203

204
// Copy returns a copy of the TxRollbackDentry.
205
func (d *TxRollbackDentry) Copy() btree.Item {
206
	item := d.dentry.Copy()
207
	txDentryInfo := *d.txDentryInfo
208

209
	return &TxRollbackDentry{
210
		dentry:       item.(*Dentry),
211
		txDentryInfo: &txDentryInfo,
212
		rbType:       d.rbType,
213
	}
214
}
215

216
func (d *TxRollbackDentry) Marshal() (result []byte, err error) {
217
	buff := bytes.NewBuffer(make([]byte, 0, 512))
218
	bs, err := d.dentry.Marshal()
219
	if err != nil {
220
		return nil, err
221
	}
222
	if err = binary.Write(buff, binary.BigEndian, uint32(len(bs))); err != nil {
223
		return nil, err
224
	}
225
	if _, err := buff.Write(bs); err != nil {
226
		return nil, err
227
	}
228

229
	log.LogDebugf("TxRollbackDentry Marshal dentry %v", d.dentry)
230

231
	log.LogDebugf("TxRollbackDentry Marshal txDentryInfo %v", d.ToString())
232
	bs, err = d.txDentryInfo.Marshal()
233
	if err != nil {
234
		return nil, err
235
	}
236
	if err = binary.Write(buff, binary.BigEndian, uint32(len(bs))); err != nil {
237
		return nil, err
238
	}
239
	if _, err := buff.Write(bs); err != nil {
240
		return nil, err
241
	}
242
	if err = binary.Write(buff, binary.BigEndian, &d.rbType); err != nil {
243
		return
244
	}
245
	return buff.Bytes(), nil
246
}
247

248
func (d *TxRollbackDentry) Unmarshal(raw []byte) (err error) {
249
	buff := bytes.NewBuffer(raw)
250
	var dataLen uint32
251
	if err = binary.Read(buff, binary.BigEndian, &dataLen); err != nil {
252
		return
253
	}
254
	log.LogDebugf("TxRollbackDentry Unmarshal len %v", dataLen)
255
	data := make([]byte, int(dataLen))
256
	if _, err = buff.Read(data); err != nil {
257
		return
258
	}
259

260
	dentry := &Dentry{}
261
	if err = dentry.Unmarshal(data); err != nil {
262
		return
263
	}
264

265
	log.LogDebugf("TxRollbackDentry Unmarshal dentry %v", dentry)
266

267
	d.dentry = dentry
268

269
	if err = binary.Read(buff, binary.BigEndian, &dataLen); err != nil {
270
		return
271
	}
272
	data = make([]byte, int(dataLen))
273
	if _, err = buff.Read(data); err != nil {
274
		return
275
	}
276

277
	txDentryInfo := proto.NewTxDentryInfo("", 0, "", 0)
278
	if err = txDentryInfo.Unmarshal(data); err != nil {
279
		return
280
	}
281
	d.txDentryInfo = txDentryInfo
282

283
	if err = binary.Read(buff, binary.BigEndian, &d.rbType); err != nil {
284
		return
285
	}
286
	return
287
}
288

289
func NewTxRollbackDentry(dentry *Dentry, txDentryInfo *proto.TxDentryInfo, rbType uint8) *TxRollbackDentry {
290
	return &TxRollbackDentry{
291
		dentry:       dentry,
292
		txDentryInfo: txDentryInfo,
293
		rbType:       rbType,
294
	}
295
}
296

297
// TM
298
type TransactionManager struct {
299
	// need persistence and sync to all the raft members of the mp
300
	txIdAlloc   *TxIDAllocator
301
	txTree      *BTree
302
	txProcessor *TransactionProcessor
303
	blacklist   *util.Set
304
	opLimiter   *rate.Limiter
305
	sync.RWMutex
306
}
307

308
// RM
309
type TransactionResource struct {
310
	txRbInodeTree  *BTree // key: inode id
311
	txRbDentryTree *BTree // key: parentId_name
312
	txProcessor    *TransactionProcessor
313
	sync.RWMutex
314
}
315

316
type TransactionProcessor struct {
317
	txManager  *TransactionManager  // TM
318
	txResource *TransactionResource // RM
319
	mp         *metaPartition
320
	mask       proto.TxOpMask
321
}
322

323
func (p *TransactionProcessor) Reset() {
324
	p.txManager.Reset()
325
	p.txResource.Reset()
326
}
327

328
func (p *TransactionProcessor) Pause() bool {
329
	return p.mask == proto.TxPause
330
}
331

332
func NewTransactionManager(txProcessor *TransactionProcessor) *TransactionManager {
333
	txMgr := &TransactionManager{
334
		txIdAlloc:   newTxIDAllocator(),
335
		txTree:      NewBtree(),
336
		txProcessor: txProcessor,
337
		blacklist:   util.NewSet(),
338
		opLimiter:   rate.NewLimiter(rate.Inf, 128),
339
	}
340
	return txMgr
341
}
342

343
func NewTransactionResource(txProcessor *TransactionProcessor) *TransactionResource {
344
	txRsc := &TransactionResource{
345
		txRbInodeTree:  NewBtree(),
346
		txRbDentryTree: NewBtree(),
347
		txProcessor:    txProcessor,
348
	}
349
	return txRsc
350
}
351

352
func NewTransactionProcessor(mp *metaPartition) *TransactionProcessor {
353
	txProcessor := &TransactionProcessor{
354
		mp: mp,
355
	}
356
	txProcessor.txManager = NewTransactionManager(txProcessor)
357
	txProcessor.txResource = NewTransactionResource(txProcessor)
358

359
	if mp.config != nil {
360
		go txProcessor.txManager.processExpiredTransactions()
361
	}
362
	return txProcessor
363
}
364

365
func (tm *TransactionManager) setLimit(val int) string {
366
	if val > 0 {
367
		tm.opLimiter.SetLimit(rate.Limit(val))
368
		return fmt.Sprintf("%v", val)
369
	}
370
	tm.opLimiter.SetLimit(rate.Inf)
371
	return "unlimited"
372
}
373

374
func (tm *TransactionManager) Reset() {
375
	tm.blacklist.Clear()
376
	tm.Lock()
377
	tm.txIdAlloc.Reset()
378
	tm.txTree.Reset()
379
	tm.opLimiter.SetLimit(0)
380
	tm.Unlock()
381
}
382

383
var test = false
384

385
func (tm *TransactionManager) processExpiredTransactions() {
386
	mpId := tm.txProcessor.mp.config.PartitionId
387
	log.LogInfof("processExpiredTransactions for mp[%v] started", mpId)
388
	clearInterval := time.Second * 60
389
	clearTimer := time.NewTimer(clearInterval)
390
	txCheckVal := time.Second * 3
391
	txCheckTimer := time.NewTimer(txCheckVal)
392

393
	defer func() {
394
		log.LogWarnf("processExpiredTransactions for mp[%v] exit", mpId)
395
		txCheckTimer.Stop()
396
		clearTimer.Stop()
397
		return
398
	}()
399

400
	for {
401
		select {
402
		case <-tm.txProcessor.mp.stopC:
403
			log.LogDebugf("[processExpiredTransactions] deleteWorker stop partition: %v", mpId)
404
			return
405
		default:
406
		}
407

408
		if _, ok := tm.txProcessor.mp.IsLeader(); !ok && !test {
409
			log.LogDebugf("processExpiredTransactions: not leader sleep 1s, mp %d", mpId)
410
			time.Sleep(time.Second * 10)
411
			continue
412
		}
413

414
		select {
415
		case <-tm.txProcessor.mp.stopC:
416
			log.LogWarnf("processExpiredTransactions for mp[%v] stopped", mpId)
417
			return
418
		case <-clearTimer.C:
419
			tm.blacklist.Clear()
420
			clearTimer.Reset(clearInterval)
421
			log.LogDebugf("processExpiredTransactions: blacklist cleared, mp %d", mpId)
422
		case <-txCheckTimer.C:
423
			if tm.txProcessor.Pause() {
424
				txCheckTimer.Reset(txCheckVal)
425
				continue
426
			}
427
			tm.processTx()
428
			txCheckTimer.Reset(txCheckVal)
429
		}
430
	}
431
}
432

433
func (tm *TransactionManager) processTx() {
434
	mpId := tm.txProcessor.mp.config.PartitionId
435
	start := time.Now()
436
	log.LogDebugf("processTx: mp[%v] mask %v", mpId, proto.GetMaskString(tm.txProcessor.mask))
437
	defer func() {
438
		log.LogDebugf("processTx: mp %d total cost %s", mpId, time.Since(start).String())
439
	}()
440

441
	limitCh := make(chan struct{}, 32)
442
	var wg sync.WaitGroup
443

444
	get := func() {
445
		wg.Add(1)
446
		limitCh <- struct{}{}
447
	}
448
	put := func() {
449
		<-limitCh
450
		wg.Done()
451
	}
452

453
	idx := 0
454
	f := func(i BtreeItem) bool {
455
		idx++
456
		if idx%100 == 0 {
457
			if _, ok := tm.txProcessor.mp.IsLeader(); !ok {
458
				log.LogWarnf("processExpiredTransactions for mp[%v] already not leader and break tx tree traverse",
459
					tm.txProcessor.mp.config.PartitionId)
460
				return false
461
			}
462
		}
463

464
		tx := i.(*proto.TransactionInfo)
465
		rollbackFunc := func(skipSetStat bool) {
466
			defer put()
467
			status, err := tm.rollbackTx(tx.TxID, skipSetStat)
468

469
			if err != nil || status != proto.OpOk {
470
				log.LogWarnf("processExpiredTransactions: transaction (%v) expired, rolling back failed, status(%v), err(%v)",
471
					tx, status, err)
472
				return
473
			}
474

475
			if log.EnableDebug() {
476
				log.LogDebugf("processExpiredTransactions: transaction (%v) expired, rolling back done", tx)
477
			}
478
		}
479

480
		commitFunc := func() {
481
			defer put()
482
			status, err := tm.commitTx(tx.TxID, true)
483
			if err != nil || status != proto.OpOk {
484
				log.LogWarnf("processExpiredTransactions: transaction (%v) expired, commit failed, status(%v), err(%v)",
485
					tx, status, err)
486
				return
487
			}
488

489
			if log.EnableDebug() {
490
				log.LogDebugf("processExpiredTransactions: transaction (%v) expired, commit done", tx)
491
			}
492
		}
493

494
		delFunc := func() {
495
			defer put()
496
			status, err := tm.delTxFromRM(tx.TxID)
497
			if err != nil || status != proto.OpOk {
498
				log.LogWarnf("processExpiredTransactions: delTxFromRM (%v) expired, commit failed, status(%v), err(%v)",
499
					tx, status, err)
500
				return
501
			}
502
			if log.EnableDebug() {
503
				log.LogDebugf("processExpiredTransactions: transaction (%v) delTxFromRM, commit done", tx)
504
			}
505
		}
506

507
		clearOrphan := func() {
508
			defer put()
509
			tm.clearOrphanTx(tx)
510
			if log.EnableDebug() {
511
				log.LogDebugf("processExpiredTransactions: transaction (%v) clearOrphanTx", tx)
512
			}
513
		}
514

515
		if tx.TmID != int64(mpId) {
516
			if tx.CanDelete() {
517
				if log.EnableDebug() {
518
					log.LogDebugf("processExpiredTransactions: transaction (%v) can be deleted", tx)
519
				}
520
				get()
521
				go delFunc()
522
				return true
523
			}
524

525
			if tx.NeedClearOrphan() {
526
				if log.EnableDebug() {
527
					log.LogDebugf("processExpiredTransactions: orphan transaction (%v) can be clear", tx)
528
				}
529
				get()
530
				go clearOrphan()
531
				return true
532
			}
533

534
			if log.EnableDebug() {
535
				log.LogDebugf("processExpiredTransactions: RM transaction (%v) is ongoing", tx)
536
			}
537
			return true
538
		}
539

540
		if tx.State == proto.TxStateCommit {
541
			if log.EnableDebug() {
542
				log.LogDebugf("processExpiredTransactions: transaction (%v) continue to commit...", tx)
543
			}
544
			get()
545
			go commitFunc()
546
			return true
547
		}
548

549
		if tx.State == proto.TxStateRollback {
550
			if log.EnableDebug() {
551
				log.LogDebugf("processExpiredTransactions: transaction (%v) continue to roll back...", tx)
552
			}
553
			get()
554
			go rollbackFunc(true)
555
			return true
556
		}
557

558
		if tx.State == proto.TxStatePreCommit {
559
			if !tx.IsExpired() {
560
				return true
561
			}
562

563
			if log.EnableDebug() {
564
				log.LogDebugf("processExpiredTransactions: transaction (%v) expired, rolling back...", tx)
565
			}
566
			get()
567
			go rollbackFunc(false)
568
			return true
569
		}
570

571
		if tx.IsDone() {
572
			if !tx.CanDelete() {
573
				if log.EnableDebug() {
574
					log.LogDebugf("processExpiredTransactions: transaction (%v) is ongoing", tx)
575
				}
576
				return true
577
			}
578

579
			if log.EnableDebug() {
580
				log.LogDebugf("processExpiredTransactions: transaction (%v) can be deleted", tx)
581
			}
582
			get()
583
			go delFunc()
584
			return true
585
		}
586

587
		log.LogCriticalf("processExpiredTransactions: transaction (%v) is in state failed", tx)
588
		return true
589
	}
590

591
	tm.txTree.GetTree().Ascend(f)
592
	wg.Wait()
593
}
594

595
func (tm *TransactionManager) nextTxID() string {
596
	id := tm.txIdAlloc.allocateTransactionID()
597
	txId := fmt.Sprintf("%d_%d", tm.txProcessor.mp.config.PartitionId, id)
598
	log.LogDebugf("nextTxID: txId:%v", txId)
599
	return txId
600
}
601

602
func (tm *TransactionManager) txInRMDone(txId string) bool {
603
	ifo := tm.getTransaction(txId)
604
	if ifo == nil || ifo.Finish() {
605
		log.LogWarnf("txInRMDone: tx in rm already done, txId %s, ifo %v", txId, ifo)
606
		return true
607
	}
608
	return false
609
}
610

611
func (tm *TransactionManager) getTransaction(txID string) (txInfo *proto.TransactionInfo) {
612
	txItem := proto.NewTxInfoBItem(txID)
613
	item := tm.txTree.Get(txItem)
614
	if item == nil {
615
		return nil
616
	}
617
	txInfo = item.(*proto.TransactionInfo)
618
	return
619
}
620

621
func (tm *TransactionManager) copyGetTx(txId string) (txInfo *proto.TransactionInfo) {
622
	txItem := proto.NewTxInfoBItem(txId)
623
	item := tm.txTree.CopyGet(txItem)
624
	if item == nil {
625
		return nil
626
	}
627

628
	txInfo = item.(*proto.TransactionInfo)
629
	return
630
}
631

632
func (tm *TransactionManager) updateTxIdCursor(txId string) (err error) {
633
	arr := strings.Split(txId, "_")
634
	if len(arr) != 2 {
635
		return fmt.Errorf("updateTxId: tx[%v] is invalid", txId)
636
	}
637
	id, err := strconv.ParseUint(arr[1], 10, 64)
638
	if err != nil {
639
		return fmt.Errorf("updateTxId: tx[%v] is invalid", txId)
640
	}
641
	if id > tm.txIdAlloc.getTransactionID() {
642
		tm.txIdAlloc.setTransactionID(id)
643
	}
644
	return nil
645
}
646

647
func (tm *TransactionManager) addTxInfo(txInfo *proto.TransactionInfo) {
648
	tm.txTree.ReplaceOrInsert(txInfo, true)
649
}
650

651
// TM register a transaction, process client transaction
652
func (tm *TransactionManager) registerTransaction(txInfo *proto.TransactionInfo) (err error) {
653
	if uint64(txInfo.TmID) == tm.txProcessor.mp.config.PartitionId {
654
		if err := tm.updateTxIdCursor(txInfo.TxID); err != nil {
655
			log.LogErrorf("updateTxIdCursor failed, txInfo %s, err %s", txInfo.String(), err.Error())
656
			return err
657
		}
658

659
		for _, inode := range txInfo.TxInodeInfos {
660
			inode.SetCreateTime(txInfo.CreateTime)
661
			inode.SetTimeout(txInfo.Timeout)
662
			inode.SetTxId(txInfo.TxID)
663
		}
664

665
		for _, dentry := range txInfo.TxDentryInfos {
666
			dentry.SetCreateTime(txInfo.CreateTime)
667
			dentry.SetTimeout(txInfo.Timeout)
668
			dentry.SetTxId(txInfo.TxID)
669
		}
670
	}
671

672
	if info := tm.getTransaction(txInfo.TxID); info != nil {
673
		log.LogWarnf("tx is already exist, txId %s, info %v", txInfo.TxID, info.String())
674
		return nil
675
	}
676

677
	tm.addTxInfo(txInfo)
678

679
	if log.EnableDebug() {
680
		log.LogDebugf("registerTransaction: txInfo(%v)", txInfo)
681
	}
682

683
	return
684
}
685

686
func (tm *TransactionManager) deleteTxInfo(txId string) (status uint8) {
687
	tm.Lock()
688
	defer tm.Unlock()
689
	status = proto.OpOk
690
	txItem := proto.NewTxInfoBItem(txId)
691
	item := tm.txTree.Delete(txItem)
692
	if log.EnableDebug() {
693
		log.LogDebugf("deleteTxInfo: tx[%v] is deleted, item %v", txId, item)
694
	}
695
	return
696
}
697

698
func (tm *TransactionManager) rollbackTxInfo(txId string) (status uint8) {
699
	tm.Lock()
700
	defer tm.Unlock()
701
	status = proto.OpOk
702

703
	tx := tm.getTransaction(txId)
704
	if tx == nil {
705
		status = proto.OpTxInfoNotExistErr
706
		log.LogWarnf("rollbackTxInfo: rollback tx[%v] failed, not found", txId)
707
		return
708
	}
709

710
	tx.State = proto.TxStateRollbackDone
711
	tx.DoneTime = time.Now().Unix()
712
	log.LogDebugf("rollbackTxInfo: tx[%v] is rolled back", tx)
713
	return
714
}
715

716
func (tm *TransactionManager) commitTxInfo(txId string) (status uint8, err error) {
717
	tm.Lock()
718
	defer tm.Unlock()
719
	status = proto.OpOk
720
	tx := tm.getTransaction(txId)
721
	if tx == nil {
722
		status = proto.OpTxInfoNotExistErr
723
		err = fmt.Errorf("commitTxInfo: commit tx[%v] failed, not found", txId)
724
		return
725
	}
726

727
	tx.State = proto.TxStateCommitDone
728
	tx.DoneTime = time.Now().Unix()
729
	log.LogDebugf("commitTxInfo: tx[%v] is committed", tx)
730
	return
731
}
732

733
func buildTxPacket(data interface{}, mp uint64, op uint8) (pkt *proto.Packet, err error) {
734
	pkt = proto.NewPacketReqID()
735
	pkt.Opcode = op
736
	pkt.PartitionID = mp
737
	err = pkt.MarshalData(data)
738
	if err != nil {
739
		errInfo := fmt.Sprintf("buildTxPacket: marshal txInfo [%v] failed", data)
740
		err = errors.New(errInfo)
741
		log.LogErrorf("%v", errInfo)
742
		return nil, err
743
	}
744

745
	return
746
}
747

748
func (tm *TransactionManager) setTransactionState(txId string, state int32) (status uint8, err error) {
749
	var val []byte
750
	var resp interface{}
751
	status = proto.OpOk
752

753
	stateReq := &proto.TxSetStateRequest{
754
		TxID:  txId,
755
		State: state,
756
	}
757
	val, _ = json.Marshal(stateReq)
758

759
	resp, err = tm.txProcessor.mp.submit(opFSMTxSetState, val)
760
	if err != nil {
761
		log.LogWarnf("setTransactionState: set transaction[%v] state to [%v] failed, err[%v]", txId, state, err)
762
		return proto.OpAgain, err
763
	}
764
	status = resp.(uint8)
765

766
	if status != proto.OpOk {
767
		errInfo := fmt.Sprintf("setTransactionState: set transaction[%v] state to [%v] failed", txId, state)
768
		err = errors.New(errInfo)
769
		log.LogWarnf("%v", errInfo)
770
	}
771
	return
772
}
773

774
func (tm *TransactionManager) delTxFromRM(txId string) (status uint8, err error) {
775
	req := proto.TxApplyRequest{
776
		TxID: txId,
777
	}
778
	val, err := json.Marshal(req)
779
	if err != nil {
780
		return
781
	}
782

783
	resp, err := tm.txProcessor.mp.submit(opFSMTxDelete, val)
784
	if err != nil {
785
		log.LogWarnf("delTxFromRM: delTxFromRM transaction[%v] failed, err[%v]", txId, err)
786
		return proto.OpAgain, err
787
	}
788

789
	status = resp.(uint8)
790
	if log.EnableDebug() {
791
		log.LogDebugf("delTxFromRM: tx[%v] is deleted successfully, status (%s)", txId, proto.GetStatusStr(status))
792
	}
793

794
	return
795
}
796

797
func (tm *TransactionManager) clearOrphanTx(tx *proto.TransactionInfo) {
798
	log.LogWarnf("clearOrphanTx: start to clearOrphanTx, tx %v", tx)
799
	// check txInfo whether exist in tm
800
	req := &proto.TxGetInfoRequest{
801
		Pid:  uint64(tx.TmID),
802
		TxID: tx.TxID,
803
	}
804

805
	pkt, err := buildTxPacket(req, req.Pid, proto.OpMetaTxGet)
806
	if err != nil {
807
		return
808
	}
809

810
	mps := tx.GroupByMp()
811
	tmpMp, ok := mps[req.Pid]
812
	if !ok {
813
		log.LogErrorf("clearOrphanTx: can't get tm Mp info from tx, tx %v", tx)
814
		return
815
	}
816

817
	status := tm.txSendToMpWithAddrs(tmpMp.Members, pkt)
818
	if status != proto.OpTxInfoNotExistErr {
819
		log.LogWarnf("clearOrphanTx: tx is still exist, tx %v, status %s", tx, proto.GetStatusStr(status))
820
		return
821
	}
822

823
	log.LogWarnf("clearOrphanTx: find tx in tm already not exist, start clear it from rm, tx %v", tx)
824

825
	aReq := &proto.TxApplyRMRequest{
826
		PartitionID:     req.Pid,
827
		TransactionInfo: tx,
828
	}
829
	newPkt := &Packet{}
830
	err = tm.txProcessor.mp.TxRollbackRM(aReq, newPkt)
831
	log.LogWarnf("clearOrphanTx: finally rollback tx in rm, tx %v, status %s, err %v",
832
		tx, newPkt.GetResultMsg(), err)
833
	return
834
}
835

836
func (tm *TransactionManager) commitTx(txId string, skipSetStat bool) (status uint8, err error) {
837
	tx := tm.getTransaction(txId)
838
	if tx == nil {
839
		status = proto.OpTxInfoNotExistErr
840
		log.LogWarnf("commitTx: tx[%v] not found, already success", txId)
841
		return
842
	}
843

844
	if tx.State == proto.TxStateCommitDone {
845
		status = proto.OpOk
846
		log.LogWarnf("commitTx: tx[%v] is already commit", txId)
847
		return
848
	}
849

850
	// 1.set transaction to TxStateCommit
851
	if !skipSetStat && tx.State != proto.TxStateCommit {
852
		status, err = tm.setTransactionState(txId, proto.TxStateCommit)
853
		if status != proto.OpOk {
854
			log.LogWarnf("commitTx: set transaction[%v] state to TxStateCommit failed", tx)
855
			return
856
		}
857
	}
858

859
	// 2. notify all related RMs that a transaction is completed
860
	status = tm.sendToRM(tx, proto.OpTxCommitRM)
861
	if status != proto.OpOk {
862
		return
863
	}
864

865
	// 3. TM commit the transaction
866
	req := proto.TxApplyRequest{
867
		TxID: txId,
868
	}
869
	val, err := json.Marshal(req)
870
	if err != nil {
871
		return
872
	}
873

874
	resp, err := tm.txProcessor.mp.submit(opFSMTxCommit, val)
875
	if err != nil {
876
		log.LogWarnf("commitTx: commit transaction[%v] failed, err[%v]", txId, err)
877
		return proto.OpAgain, err
878
	}
879

880
	status = resp.(uint8)
881
	log.LogDebugf("commitTx: tx[%v] is commited successfully", txId)
882

883
	return
884
}
885

886
func (tm *TransactionManager) sendToRM(txInfo *proto.TransactionInfo, op uint8) (status uint8) {
887
	status = proto.OpOk
888
	mpIfos := txInfo.GroupByMp()
889
	statusCh := make(chan uint8, len(mpIfos))
890
	wg := sync.WaitGroup{}
891
	mp := tm.txProcessor.mp
892

893
	for mpId, ifo := range mpIfos {
894
		req := &proto.TxApplyRMRequest{
895
			VolName:         mp.config.VolName,
896
			PartitionID:     mpId,
897
			TransactionInfo: txInfo,
898
		}
899

900
		wg.Add(1)
901

902
		pkt, _ := buildTxPacket(req, mpId, op)
903
		if mp.config.PartitionId == mpId {
904
			pt := &Packet{*pkt}
905
			go func() {
906
				defer wg.Done()
907
				var err error
908
				if op == proto.OpTxCommitRM {
909
					err = mp.TxCommitRM(req, pt)
910
				} else {
911
					err = mp.TxRollbackRM(req, pt)
912
				}
913
				statusCh <- pt.ResultCode
914
				if pt.ResultCode != proto.OpOk {
915
					log.LogWarnf("sendToRM: invoke TxCommitRM failed, ifo %v, pkt %s, err %v", txInfo, pt.GetResultMsg(), err)
916
				}
917
			}()
918
			continue
919
		}
920

921
		members := ifo.Members
922
		go func() {
923
			defer wg.Done()
924
			status := tm.txSendToMpWithAddrs(members, pkt)
925
			if status != proto.OpOk {
926
				log.LogWarnf("sendToRM: send to rm failed, addr %s, pkt %s, status %s",
927
					members, string(pkt.Data), proto.GetStatusStr(status))
928
			}
929
			statusCh <- status
930
		}()
931
	}
932

933
	wg.Wait()
934
	close(statusCh)
935

936
	updateStatus := func(st uint8) uint8 {
937
		if st == proto.OpTxConflictErr || st == proto.OpTxInfoNotExistErr {
938
			log.LogWarnf("sendToRM: might have already been committed, tx[%v], status (%s)", txInfo, proto.GetStatusStr(st))
939
			return proto.OpOk
940
		} else if st == proto.OpTxRbInodeNotExistErr || st == proto.OpTxRbDentryNotExistErr {
941
			log.LogWarnf("sendToRM: already done before or not add, tx[%v], status (%s)", txInfo, proto.GetStatusStr(st))
942
			return proto.OpOk
943
		} else {
944
			return st
945
		}
946
	}
947

948
	for st := range statusCh {
949
		t := updateStatus(st)
950
		if t != proto.OpOk {
951
			return t
952
		}
953
	}
954

955
	return status
956
}
957

958
func (tm *TransactionManager) rollbackTx(txId string, skipSetStat bool) (status uint8, err error) {
959
	status = proto.OpOk
960

961
	tx := tm.getTransaction(txId)
962
	if tx == nil {
963
		log.LogWarnf("commitTx: tx[%v] not found, already success", txId)
964
		return
965
	}
966

967
	if tx.State == proto.TxStateRollbackDone {
968
		status = proto.OpOk
969
		log.LogWarnf("commitTx: tx[%v] is already rollback", txId)
970
		return
971
	}
972

973
	// 1.set transaction to TxStateRollback
974
	if !skipSetStat && tx.State != proto.TxStateRollback {
975
		status, err = tm.setTransactionState(txId, proto.TxStateRollback)
976
		if status != proto.OpOk {
977
			log.LogWarnf("commitTransaction: set transaction[%v] state to TxStateCommit failed", tx)
978
			return
979
		}
980
	}
981

982
	// 2. notify all related RMs that a transaction is completed
983
	status = tm.sendToRM(tx, proto.OpTxRollbackRM)
984
	if status != proto.OpOk {
985
		return
986
	}
987

988
	req := proto.TxApplyRequest{
989
		TxID: txId,
990
	}
991
	val, err := json.Marshal(req)
992
	if err != nil {
993
		return
994
	}
995

996
	resp, err := tm.txProcessor.mp.submit(opFSMTxRollback, val)
997
	if err != nil {
998
		log.LogWarnf("commitTx: rollback transaction[%v]  failed, err[%v]", txId, err)
999
		return proto.OpAgain, err
1000
	}
1001

1002
	status = resp.(uint8)
1003
	log.LogDebugf("commitTx: tx[%v] is rollback successfully, msg %s", txId, proto.GetStatusStr(status))
1004

1005
	return
1006
}
1007

1008
func (tm *TransactionManager) sendPacketToMP(addr string, p *proto.Packet) (err error) {
1009
	var (
1010
		mConn *net.TCPConn
1011
		reqID = p.ReqID
1012
		reqOp = p.Opcode
1013
	)
1014

1015
	connPool := tm.txProcessor.mp.manager.connPool
1016
	defer func() {
1017
		connPool.PutConnect(mConn, err != nil)
1018
		if err != nil {
1019
			p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
1020
			log.LogErrorf("[sendPacketToMP]: req: %d - %v, %v, packet(%v)", p.GetReqID(),
1021
				p.GetOpMsg(), err, p)
1022
			return
1023
		}
1024
	}()
1025

1026
	mConn, err = connPool.GetConnect(addr)
1027
	if err != nil {
1028
		return
1029
	}
1030

1031
	if err = p.WriteToConn(mConn); err != nil {
1032
		return
1033
	}
1034

1035
	// read connection from the master
1036
	if err = p.ReadFromConn(mConn, proto.ReadDeadlineTime); err != nil {
1037
		return
1038
	}
1039

1040
	if reqID != p.ReqID || reqOp != p.Opcode {
1041
		err = fmt.Errorf("sendPacketToMP: send and received packet mismatch: req(%v_%v) resp(%v_%v)",
1042
			reqID, reqOp, p.ReqID, p.Opcode)
1043
		return
1044
	}
1045

1046
	if log.EnableDebug() {
1047
		log.LogDebugf("[sendPacketToMP] req: %d - %v, resp: %v, packet(%v)", p.GetReqID(), p.GetOpMsg(),
1048
			p.GetResultMsg(), p)
1049
	}
1050

1051
	return
1052
}
1053

1054
func (tm *TransactionManager) txSendToMpWithAddrs(addrStr string, p *proto.Packet) (status uint8) {
1055
	addrs := strings.Split(addrStr, ",")
1056
	var err error
1057

1058
	skippedAddrs := make([]string, 0)
1059
	for _, addr := range addrs {
1060
		if tm.blacklist.Has(addr) {
1061
			log.LogWarnf("txSendToMpWithAddrs: addr[%v] is already blacklisted, retry another addr, p %s", addr, string(p.Data))
1062
			skippedAddrs = append(skippedAddrs, addr)
1063
			continue
1064
		}
1065

1066
		newPkt := p.GetCopy()
1067
		err = tm.sendPacketToMP(addr, newPkt)
1068
		if err != nil {
1069
			tm.blacklist.Add(addr)
1070
			log.LogWarnf("txSendToMpWithAddrs: send to %v failed, err(%s), add to blacklist and retry another addr, p %s",
1071
				addr, err.Error(), string(p.Data))
1072
			continue
1073
		}
1074

1075
		status := newPkt.ResultCode
1076
		if status == proto.OpErr || status == proto.OpAgain {
1077
			log.LogWarnf("txSendToMpWithAddrs: sendPacketToMp failed, addr %s, msg %s, data %s, status(%s)",
1078
				addr, newPkt.GetResultMsg(), string(p.Data), proto.GetStatusStr(status))
1079
			continue
1080
		}
1081

1082
		if status == proto.OpOk {
1083
			if log.EnableDebug() {
1084
				log.LogDebugf("txSendToMpWithAddrs: send to %v done with status[%v], tx[%s]",
1085
					addr, status, string(p.Data))
1086
			}
1087
			err = nil
1088
			return status
1089
		}
1090

1091
		log.LogWarnf("txSendToMpWithAddrs: sendPacketToMp failed, addr %s, msg %s, data %s, status %s",
1092
			addr, newPkt.GetResultMsg(), string(p.Data), proto.GetStatusStr(status))
1093
		return status
1094
	}
1095

1096
	// try use skipped addr
1097
	for _, addr := range skippedAddrs {
1098
		newPkt := p.GetCopy()
1099
		err = tm.sendPacketToMP(addr, newPkt)
1100
		if err != nil {
1101
			log.LogWarnf("txSendToMpWithAddrs: send to %v failed, err(%s), add to blacklist and retry another addr, p %s",
1102
				addr, err.Error(), string(p.Data))
1103
			continue
1104
		}
1105

1106
		status := newPkt.ResultCode
1107
		if status == proto.OpErr || status == proto.OpAgain {
1108
			log.LogWarnf("txSendToMpWithAddrs: sendPacketToMp failed, addr %s, msg %s, data %s, status(%s)",
1109
				addr, newPkt.GetResultMsg(), string(p.Data), proto.GetStatusStr(status))
1110
			continue
1111
		}
1112

1113
		if status == proto.OpOk {
1114
			if log.EnableDebug() {
1115
				log.LogDebugf("txSendToMpWithAddrs: send to %v done with status[%v], tx[%s]",
1116
					addr, status, string(p.Data))
1117
			}
1118
			err = nil
1119
			return status
1120
		}
1121

1122
		log.LogWarnf("txSendToMpWithAddrs: sendPacketToMp failed, addr %s, msg %s, data %s, status %s",
1123
			addr, newPkt.GetResultMsg(), string(p.Data), proto.GetStatusStr(status))
1124
		return status
1125
	}
1126

1127
	log.LogWarnf("txSendToMpWithAddrs: after retry still failed, return opAgain, pkt %s, addrs %v, err %v, status %s",
1128
		string(p.Data), addrs, err, proto.GetStatusStr(status))
1129
	return proto.OpAgain
1130
}
1131

1132
func (tm *TransactionManager) txSetState(req *proto.TxSetStateRequest) (status uint8, err error) {
1133
	tm.Lock()
1134
	defer tm.Unlock()
1135
	status = proto.OpOk
1136

1137
	txItem := proto.NewTxInfoBItem(req.TxID)
1138
	item := tm.txTree.CopyGet(txItem)
1139
	if item == nil {
1140
		status = proto.OpTxInfoNotExistErr
1141
		errInfo := fmt.Sprintf("txSetState: set state failed, req[%v] tx not existed", req)
1142
		err = errors.New(errInfo)
1143
		log.LogErrorf("%v", errInfo)
1144
		return
1145
	}
1146
	txInfo := item.(*proto.TransactionInfo)
1147

1148
	if req.State == proto.TxStateCommit && txInfo.State == proto.TxStateCommitDone {
1149
		log.LogWarnf("txSetState: tx is already success before set commit state, tx %v", txInfo)
1150
		status = proto.OpOk
1151
		return
1152
	}
1153

1154
	if req.State < proto.TxStateCommit || req.State > proto.TxStateFailed {
1155
		status = proto.OpTxSetStateErr
1156
		errInfo := fmt.Sprintf("txSetState: set state failed, wrong state, req[%v]", req)
1157
		err = errors.New(errInfo)
1158
		log.LogErrorf("%v", errInfo)
1159
		return
1160
	}
1161

1162
	if req.State == proto.TxStateCommit && txInfo.State != proto.TxStateCommit && txInfo.State != proto.TxStatePreCommit {
1163
		status = proto.OpTxSetStateErr
1164
		errInfo := fmt.Sprintf("txSetState: set state failed, wrong state, tx state[%v], req state[%v], tx[%v]",
1165
			txInfo.State, req.State, req.TxID)
1166
		err = errors.New(errInfo)
1167
		log.LogErrorf("%v", errInfo)
1168
		return
1169
	}
1170

1171
	if req.State == proto.TxStateRollback && txInfo.State != proto.TxStateRollback && txInfo.State != proto.TxStatePreCommit {
1172
		status = proto.OpTxSetStateErr
1173
		errInfo := fmt.Sprintf("txSetState: set state failed, wrong state, tx state[%v], req state[%v], tx[%v]",
1174
			txInfo.State, req.State, req.TxID)
1175
		err = errors.New(errInfo)
1176
		log.LogErrorf("%v", errInfo)
1177
		return
1178
	}
1179

1180
	log.LogDebugf("txSetState: set tx state from [%v] to [%v], tx[%v]", txInfo.State, req.State, req.TxID)
1181
	txInfo.State = req.State
1182
	return
1183
}
1184

1185
func (tr *TransactionResource) Reset() {
1186
	tr.Lock()
1187
	defer tr.Unlock()
1188
	tr.txRbInodeTree.Reset()
1189
	tr.txRbDentryTree.Reset()
1190
	tr.txProcessor = nil
1191
}
1192

1193
// check if item(inode, dentry) is in transaction for modifying
1194
func (tr *TransactionResource) isInodeInTransction(ino *Inode) (inTx bool, txID string) {
1195
	// return true only if specified inode is in an ongoing transaction(not expired yet)
1196
	tr.Lock()
1197
	defer tr.Unlock()
1198

1199
	if rbInode := tr.getTxRbInode(ino.Inode); rbInode != nil {
1200
		inTx = true
1201
		if rbInode.txInodeInfo != nil {
1202
			txID = rbInode.txInodeInfo.TxID
1203
		}
1204
		return
1205
	}
1206
	return false, ""
1207
}
1208

1209
func (tr *TransactionResource) isDentryInTransction(dentry *Dentry) (inTx bool, txID string) {
1210
	tr.Lock()
1211
	defer tr.Unlock()
1212

1213
	if rbDentry := tr.getTxRbDentry(dentry.ParentId, dentry.Name); rbDentry != nil {
1214
		inTx = true
1215
		if rbDentry.txDentryInfo != nil {
1216
			txID = rbDentry.txDentryInfo.TxID
1217
		}
1218
		return
1219
	}
1220
	return false, ""
1221
}
1222

1223
func (tr *TransactionResource) getTxRbInode(ino uint64) (rbInode *TxRollbackInode) {
1224
	keyNode := &TxRollbackInode{
1225
		inode: NewInode(ino, 0),
1226
	}
1227
	item := tr.txRbInodeTree.Get(keyNode)
1228
	if item == nil {
1229
		return nil
1230
	}
1231
	rbInode = item.(*TxRollbackInode)
1232
	return
1233
}
1234

1235
func (tr *TransactionResource) copyGetTxRbInode(ino uint64) (rbInode *TxRollbackInode) {
1236
	keyNode := &TxRollbackInode{
1237
		inode: NewInode(ino, 0),
1238
	}
1239
	item := tr.txRbInodeTree.CopyGet(keyNode)
1240
	if item == nil {
1241
		return nil
1242
	}
1243
	rbInode = item.(*TxRollbackInode)
1244
	return
1245
}
1246

1247
func (tr *TransactionResource) deleteTxRollbackInode(ino uint64, txId string) (status uint8) {
1248
	tr.Lock()
1249
	defer tr.Unlock()
1250

1251
	keyNode := &TxRollbackInode{
1252
		txInodeInfo: proto.NewTxInodeInfo("", ino, 0),
1253
	}
1254

1255
	item := tr.txRbInodeTree.Get(keyNode)
1256
	if item == nil {
1257
		log.LogWarnf("deleteTxRollbackInode: rollback inode may be already been deleted, inode %d, txId %s",
1258
			ino, txId)
1259
		return proto.OpTxRbInodeNotExistErr
1260
	}
1261

1262
	if item.(*TxRollbackInode).txInodeInfo.TxID != txId {
1263
		log.LogWarnf("deleteTxRollbackInode: rollback dentry is already been update by other, txId %s, item %v",
1264
			txId, item)
1265
		return proto.OpTxRbDentryNotExistErr
1266
	}
1267

1268
	tr.txRbInodeTree.Delete(item)
1269
	return proto.OpOk
1270
}
1271

1272
// RM add an `TxRollbackInode` into `txRollbackInodes`
1273
func (tr *TransactionResource) addTxRollbackInode(rbInode *TxRollbackInode) (status uint8) {
1274
	tr.Lock()
1275
	defer tr.Unlock()
1276

1277
	oldRbInode := tr.getTxRbInode(rbInode.inode.Inode)
1278
	if oldRbInode != nil {
1279
		if oldRbInode.txInodeInfo.TxID == rbInode.txInodeInfo.TxID {
1280
			log.LogWarnf("addTxRollbackInode: rollback inode [ino(%v) txID(%v)] is already exists",
1281
				rbInode.inode.Inode, rbInode.txInodeInfo.TxID)
1282
			return proto.OpExistErr
1283
		} else {
1284
			log.LogErrorf("addTxRollbackInode: rollback inode [ino(%v) txID(%v)] "+
1285
				"is conflicted with inode [ino(%v) txID(%v)]",
1286
				rbInode.inode.Inode, rbInode.txInodeInfo.TxID, oldRbInode.inode.Inode, oldRbInode.txInodeInfo.TxID)
1287
			return proto.OpTxConflictErr
1288
		}
1289
	}
1290

1291
	tr.txRbInodeTree.ReplaceOrInsert(rbInode, true)
1292
	log.LogDebugf("addTxRollbackInode: rollback inode [ino(%v) txID(%v)] is added", rbInode.inode.Inode, rbInode.txInodeInfo.TxID)
1293
	return proto.OpOk
1294
}
1295

1296
func (tr *TransactionResource) getTxRbDentry(pId uint64, name string) *TxRollbackDentry {
1297
	keyNode := &TxRollbackDentry{
1298
		txDentryInfo: proto.NewTxDentryInfo("", pId, name, 0),
1299
	}
1300
	item := tr.txRbDentryTree.Get(keyNode)
1301
	if item == nil {
1302
		return nil
1303
	}
1304

1305
	return item.(*TxRollbackDentry)
1306
}
1307

1308
func (tr *TransactionResource) deleteTxRollbackDentry(pid uint64, name, txId string) (status uint8) {
1309
	tr.Lock()
1310
	defer tr.Unlock()
1311

1312
	keyNode := &TxRollbackDentry{
1313
		txDentryInfo: proto.NewTxDentryInfo("", pid, name, 0),
1314
	}
1315

1316
	item := tr.txRbDentryTree.Get(keyNode)
1317
	if item == nil {
1318
		log.LogWarnf("deleteTxRollbackDentry: rollback dentry may be already been deleted, pid %d, name %s, txId %s",
1319
			pid, name, txId)
1320
		return proto.OpTxRbDentryNotExistErr
1321
	}
1322

1323
	if item.(*TxRollbackDentry).txDentryInfo.TxID != txId {
1324
		log.LogWarnf("deleteTxRollbackDentry: rollback dentry is already been update by other, txId %s, item %v",
1325
			txId, name)
1326
		return proto.OpTxRbDentryNotExistErr
1327
	}
1328

1329
	tr.txRbDentryTree.Delete(item)
1330
	return proto.OpOk
1331
}
1332

1333
// RM add a `TxRollbackDentry` into `txRollbackDentries`
1334
func (tr *TransactionResource) addTxRollbackDentry(rbDentry *TxRollbackDentry) (status uint8) {
1335
	tr.Lock()
1336
	defer tr.Unlock()
1337

1338
	oldRbDentry := tr.getTxRbDentry(rbDentry.txDentryInfo.ParentId, rbDentry.dentry.Name)
1339
	if oldRbDentry != nil {
1340
		if oldRbDentry.txDentryInfo.TxID == rbDentry.txDentryInfo.TxID {
1341
			log.LogWarnf("addTxRollbackDentry: rollback dentry [pino(%v) name(%v) txID(%v)] is already exists",
1342
				rbDentry.dentry.ParentId, rbDentry.dentry.Name, rbDentry.txDentryInfo.TxID)
1343
			return proto.OpExistErr
1344
		}
1345
		log.LogWarnf("addTxRollbackDentry: rollback dentry [pino(%v) name(%v) txID(%v) rbType(%v)] "+
1346
			"is conflicted with dentry [pino(%v) name(%v)  txID(%v) rbType(%v)]",
1347
			rbDentry.dentry.ParentId, rbDentry.dentry.Name, rbDentry.txDentryInfo.TxID, rbDentry.rbType,
1348
			oldRbDentry.dentry.ParentId, oldRbDentry.dentry.Name, oldRbDentry.txDentryInfo.TxID, oldRbDentry.rbType)
1349
		return proto.OpTxConflictErr
1350
	}
1351

1352
	tr.txRbDentryTree.ReplaceOrInsert(rbDentry, true)
1353
	log.LogDebugf("addTxRollbackDentry: rollback dentry [pino(%v) name(%v) txID(%v) rbType(%v)] is added",
1354
		rbDentry.dentry.ParentId, rbDentry.dentry.Name, rbDentry.txDentryInfo.TxID, rbDentry.rbType)
1355
	return proto.OpOk
1356
}
1357

1358
func (tr *TransactionResource) rollbackInodeInternal(rbInode *TxRollbackInode) (status uint8, err error) {
1359
	status = proto.OpOk
1360
	mp := tr.txProcessor.mp
1361
	switch rbInode.rbType {
1362
	case TxAdd:
1363
		var ino *Inode
1364
		item := mp.inodeTree.CopyGet(rbInode.inode)
1365
		if item != nil {
1366
			ino = item.(*Inode)
1367
		}
1368

1369
		if item == nil || ino.IsTempFile() || ino.ShouldDelete() {
1370
			mp.freeList.Remove(rbInode.inode.Inode)
1371
			if mp.uidManager != nil {
1372
				mp.uidManager.addUidSpace(rbInode.inode.Uid, rbInode.inode.Inode, rbInode.inode.Extents.eks)
1373
			}
1374
			if mp.mqMgr != nil && len(rbInode.quotaIds) > 0 && item == nil {
1375
				mp.setInodeQuota(rbInode.quotaIds, rbInode.inode.Inode)
1376
				for _, quotaId := range rbInode.quotaIds {
1377
					mp.mqMgr.updateUsedInfo(int64(rbInode.inode.Size), 1, quotaId)
1378
				}
1379
			}
1380
			mp.inodeTree.ReplaceOrInsert(rbInode.inode, true)
1381
		} else {
1382
			ino.IncNLink(mp.verSeq)
1383
		}
1384

1385
	case TxDelete:
1386
		if rsp := tr.txProcessor.mp.getInode(rbInode.inode, false); rsp.Status == proto.OpOk {
1387
			if tr.txProcessor.mp.uidManager != nil {
1388
				tr.txProcessor.mp.uidManager.doMinusUidSpace(rbInode.inode.Uid, rbInode.inode.Inode, rbInode.inode.Size)
1389
			}
1390

1391
			if tr.txProcessor.mp.mqMgr != nil && len(rbInode.quotaIds) > 0 {
1392
				for _, quotaId := range rbInode.quotaIds {
1393
					tr.txProcessor.mp.mqMgr.updateUsedInfo(-1*int64(rbInode.inode.Size), -1, quotaId)
1394
				}
1395
			}
1396
			tr.txProcessor.mp.fsmUnlinkInode(rbInode.inode, 0)
1397
			tr.txProcessor.mp.fsmEvictInode(rbInode.inode)
1398
		}
1399

1400
	default:
1401
		status = proto.OpTxRollbackUnknownRbType
1402
		err = fmt.Errorf("rollbackInode: unknown rbType %d", rbInode.rbType)
1403
		return
1404
	}
1405
	tr.txRbInodeTree.Delete(rbInode)
1406
	return
1407
}
1408

1409
// RM roll back an inode, retry if error occours
1410
func (tr *TransactionResource) rollbackInode(req *proto.TxInodeApplyRequest) (status uint8, err error) {
1411
	tr.Lock()
1412
	defer tr.Unlock()
1413
	status = proto.OpOk
1414
	rbInode := tr.getTxRbInode(req.Inode)
1415
	if rbInode == nil {
1416
		status = proto.OpTxRbInodeNotExistErr
1417
		errInfo := fmt.Sprintf("rollbackInode: roll back inode[%v] failed, txID[%v], rb inode not found", req.Inode, req.TxID)
1418
		err = errors.New(errInfo)
1419
		log.LogErrorf("%v", errInfo)
1420
		return
1421
	}
1422

1423
	if rbInode.txInodeInfo.TxID != req.TxID {
1424
		status = proto.OpTxConflictErr
1425
		errInfo := fmt.Sprintf("rollbackInode: txID %v is not matching txInodeInfo txID %v", req.TxID, rbInode.txInodeInfo.TxID)
1426
		err = errors.New(errInfo)
1427
		log.LogErrorf("%v", errInfo)
1428
		return
1429
	}
1430

1431
	status, err = tr.rollbackInodeInternal(rbInode)
1432
	if err != nil {
1433
		log.LogErrorf("rollbackInode: inode[%v] roll back failed in tx[%v], rbType[%v]", req.Inode, req.TxID, rbInode.rbType)
1434
	} else {
1435
		log.LogDebugf("rollbackInode: inode[%v] is rolled back in tx[%v], rbType[%v]", req.Inode, req.TxID, rbInode.rbType)
1436
	}
1437

1438
	return
1439
}
1440

1441
func (tr *TransactionResource) rollbackDentryInternal(rbDentry *TxRollbackDentry) (status uint8, err error) {
1442
	defer func() {
1443
		if status != proto.OpOk {
1444
			log.LogErrorf("rollbackDentryInternal: rollback dentry failed, ifo %v", rbDentry.txDentryInfo)
1445
		}
1446
	}()
1447
	status = proto.OpOk
1448
	switch rbDentry.rbType {
1449
	case TxAdd:
1450
		// need to be true to assert link not change.
1451
		status = tr.txProcessor.mp.fsmCreateDentry(rbDentry.dentry, true)
1452
	case TxDelete:
1453
		resp := tr.txProcessor.mp.fsmDeleteDentry(rbDentry.dentry, true)
1454
		status = resp.Status
1455
	case TxUpdate:
1456
		resp := tr.txProcessor.mp.fsmUpdateDentry(rbDentry.dentry)
1457
		status = resp.Status
1458
	default:
1459
		status = proto.OpTxRollbackUnknownRbType
1460
		err = fmt.Errorf("rollbackDentry: unknown rbType %d", rbDentry.rbType)
1461
		return
1462
	}
1463

1464
	tr.txRbDentryTree.Delete(rbDentry)
1465
	return
1466
}
1467

1468
// RM roll back a dentry, retry if error occours
1469
func (tr *TransactionResource) rollbackDentry(req *proto.TxDentryApplyRequest) (status uint8, err error) {
1470
	tr.Lock()
1471
	defer tr.Unlock()
1472
	status = proto.OpOk
1473
	rbDentry := tr.getTxRbDentry(req.Pid, req.Name)
1474
	if rbDentry == nil {
1475
		status = proto.OpTxRbDentryNotExistErr
1476
		errInfo := fmt.Sprintf("rollbackDentry: roll back dentry[%v_%v] failed, rb inode not found, txID[%v]",
1477
			req.Pid, req.Name, req.TxID)
1478
		err = errors.New(errInfo)
1479
		log.LogWarnf("%v", errInfo)
1480
		return
1481
	}
1482

1483
	if rbDentry.txDentryInfo.TxID != req.TxID {
1484
		status = proto.OpTxConflictErr
1485
		errInfo := fmt.Sprintf("rollbackDentry: txID %v is not matching txInodeInfo txID %v", req.TxID, rbDentry.txDentryInfo.TxID)
1486
		err = errors.New(errInfo)
1487
		log.LogWarnf("%v", errInfo)
1488
		return
1489
	}
1490

1491
	status, err = tr.rollbackDentryInternal(rbDentry)
1492
	if err != nil {
1493
		log.LogErrorf("rollbackDentry: denKey[%v] roll back failed in tx[%v], rbType[%v]",
1494
			rbDentry.txDentryInfo.GetKey(), req.TxID, rbDentry.rbType)
1495
	} else {
1496
		log.LogDebugf("rollbackDentry: denKey[%v] is rolled back in tx[%v], rbType[%v]",
1497
			rbDentry.txDentryInfo.GetKey(), req.TxID, rbDentry.rbType)
1498
	}
1499

1500
	return
1501
}
1502

1503
// RM simplely remove the inode from TransactionResource
1504
func (tr *TransactionResource) commitInode(txID string, inode uint64) (status uint8, err error) {
1505
	tr.Lock()
1506
	defer tr.Unlock()
1507
	status = proto.OpOk
1508
	rbInode := tr.getTxRbInode(inode)
1509
	if rbInode == nil {
1510
		status = proto.OpTxRbInodeNotExistErr
1511
		errInfo := fmt.Sprintf("commitInode: commit inode[%v] failed, rb inode not found", inode)
1512
		err = errors.New(errInfo)
1513
		log.LogWarnf("%v", errInfo)
1514
		return
1515
	}
1516

1517
	if rbInode.txInodeInfo.TxID != txID {
1518
		status = proto.OpTxConflictErr
1519
		errInfo := fmt.Sprintf("commitInode: txID %v is not matching txInodeInfo txID %v", txID, rbInode.txInodeInfo.TxID)
1520
		err = errors.New(errInfo)
1521
		log.LogErrorf("%v", errInfo)
1522
		return
1523
	}
1524

1525
	tr.txRbInodeTree.Delete(rbInode)
1526
	log.LogDebugf("commitInode: inode[%v] is committed", inode)
1527
	return
1528
}
1529

1530
// RM simplely remove the dentry from TransactionResource
1531
func (tr *TransactionResource) commitDentry(txID string, pId uint64, name string) (status uint8, err error) {
1532
	tr.Lock()
1533
	defer tr.Unlock()
1534
	status = proto.OpOk
1535

1536
	rbDentry := tr.getTxRbDentry(pId, name)
1537
	if rbDentry == nil {
1538
		status = proto.OpTxRbDentryNotExistErr
1539
		errInfo := fmt.Sprintf("commitDentry: commit dentry[%v_%v] failed, rb dentry not found", pId, name)
1540
		err = errors.New(errInfo)
1541
		log.LogWarnf("%v", errInfo)
1542
		return
1543
	}
1544

1545
	if rbDentry.txDentryInfo.TxID != txID {
1546
		status = proto.OpTxConflictErr
1547
		errInfo := fmt.Sprintf("commitDentry: txID %v is not matching txDentryInfo txID %v", txID, rbDentry.txDentryInfo.TxID)
1548
		err = errors.New(errInfo)
1549
		log.LogWarnf("%v", errInfo)
1550
		return
1551
	}
1552

1553
	tr.txRbDentryTree.Delete(rbDentry)
1554
	// unlink parent inode
1555
	if rbDentry.rbType == TxAdd {
1556
		parInode := NewInode(pId, 0)
1557
		st := tr.txProcessor.mp.fsmUnlinkInode(parInode, 0)
1558
		if st.Status != proto.OpOk {
1559
			log.LogWarnf("commitDentry: try unlink parent inode failed, txId %s, inode[%v]", txID, parInode)
1560
			return
1561
		}
1562
	}
1563

1564
	log.LogDebugf("commitDentry: dentry[%v] is committed", rbDentry.txDentryInfo.GetKey())
1565
	return
1566
}
1567

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.