1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.k
29
"golang.org/x/time/rate"
31
"github.com/cubefs/cubefs/proto"
32
"github.com/cubefs/cubefs/util"
33
"github.com/cubefs/cubefs/util/btree"
34
"github.com/cubefs/cubefs/util/log"
45
func (i *TxRollbackInode) ToString() string {
46
content := fmt.Sprintf("{inode:[ino:%v, type:%v, nlink:%v], quotaIds:%v, rbType:%v"+
47
"txInodeInfo:[Ino:%v, MpID:%v, CreateTime:%v, Timeout:%v, TxID:%v, MpMembers:%v]}",
48
i.inode.Inode, i.inode.Type, i.inode.NLink, i.quotaIds, i.rbType, i.txInodeInfo.Ino, i.txInodeInfo.MpID,
49
i.txInodeInfo.CreateTime, i.txInodeInfo.Timeout, i.txInodeInfo.TxID, i.txInodeInfo.MpMembers)
53
type TxRollbackInode struct {
55
txInodeInfo *proto.TxInodeInfo
56
rbType uint8 // Rollback Type
60
// Less tests whether the current TxRollbackInode item is less than the given one.
61
func (i *TxRollbackInode) Less(than btree.Item) bool {
62
ti, ok := than.(*TxRollbackInode)
67
if i.txInodeInfo != nil && ti.txInodeInfo != nil {
68
return i.txInodeInfo.Ino < ti.txInodeInfo.Ino
71
return i.inode.Inode < ti.inode.Inode
74
// Copy returns a copy of the TxRollbackInode.
75
func (i *TxRollbackInode) Copy() btree.Item {
76
item := i.inode.Copy()
77
txInodeInfo := *i.txInodeInfo
79
quotaIds := make([]uint32, len(i.quotaIds))
80
copy(quotaIds, i.quotaIds)
82
return &TxRollbackInode{
85
txInodeInfo: &txInodeInfo,
90
func (i *TxRollbackInode) Marshal() (result []byte, err error) {
91
buff := bytes.NewBuffer(make([]byte, 0, 256))
92
bs, err := i.inode.Marshal()
96
if err = binary.Write(buff, binary.BigEndian, uint32(len(bs))); err != nil {
99
if _, err = buff.Write(bs); err != nil {
102
bs, err = i.txInodeInfo.Marshal()
106
if err = binary.Write(buff, binary.BigEndian, uint32(len(bs))); err != nil {
109
if _, err = buff.Write(bs); err != nil {
112
if err = binary.Write(buff, binary.BigEndian, &i.rbType); err != nil {
116
quotaBytes := bytes.NewBuffer(make([]byte, 0, 8))
117
for _, quotaId := range i.quotaIds {
118
if err = binary.Write(quotaBytes, binary.BigEndian, quotaId); err != nil {
123
_, err = buff.Write(quotaBytes.Bytes())
124
return buff.Bytes(), err
127
func (i *TxRollbackInode) Unmarshal(raw []byte) (err error) {
128
buff := bytes.NewBuffer(raw)
130
if err = binary.Read(buff, binary.BigEndian, &dataLen); err != nil {
133
data := make([]byte, int(dataLen))
134
if _, err = buff.Read(data); err != nil {
138
ino := NewInode(0, 0)
139
if err = ino.Unmarshal(data); err != nil {
144
if err = binary.Read(buff, binary.BigEndian, &dataLen); err != nil {
147
data = make([]byte, int(dataLen))
148
if _, err = buff.Read(data); err != nil {
152
txInodeInfo := proto.NewTxInodeInfo("", 0, 0)
153
if err = txInodeInfo.Unmarshal(data); err != nil {
156
i.txInodeInfo = txInodeInfo
158
if err = binary.Read(buff, binary.BigEndian, &i.rbType); err != nil {
167
if err = binary.Read(buff, binary.BigEndian, "aId); err != nil {
170
i.quotaIds = append(i.quotaIds, quotaId)
175
func NewTxRollbackInode(inode *Inode, quotaIds []uint32, txInodeInfo *proto.TxInodeInfo, rbType uint8) *TxRollbackInode {
176
return &TxRollbackInode{
179
txInodeInfo: txInodeInfo,
184
type TxRollbackDentry struct {
186
txDentryInfo *proto.TxDentryInfo
187
rbType uint8 // Rollback Type `
190
func (d *TxRollbackDentry) ToString() string {
191
content := fmt.Sprintf("{dentry:[ParentId:%v, Name:%v, Inode:%v, Type:%v], rbType:%v, "+
192
"txDentryInfo:[ParentId:%v, Name:%v, MpMembers:%v, TxID:%v, MpID:%v, CreateTime:%v, Timeout:%v]}",
193
d.dentry.ParentId, d.dentry.Name, d.dentry.Inode, d.dentry.Type, d.rbType, d.txDentryInfo.ParentId, d.txDentryInfo.Name,
194
d.txDentryInfo.MpMembers, d.txDentryInfo.TxID, d.txDentryInfo.MpID, d.txDentryInfo.CreateTime, d.txDentryInfo.Timeout)
198
// Less tests whether the current TxRollbackDentry item is less than the given one.
199
func (d *TxRollbackDentry) Less(than btree.Item) bool {
200
td, ok := than.(*TxRollbackDentry)
201
return ok && d.txDentryInfo.GetKey() < td.txDentryInfo.GetKey()
204
// Copy returns a copy of the TxRollbackDentry.
205
func (d *TxRollbackDentry) Copy() btree.Item {
206
item := d.dentry.Copy()
207
txDentryInfo := *d.txDentryInfo
209
return &TxRollbackDentry{
210
dentry: item.(*Dentry),
211
txDentryInfo: &txDentryInfo,
216
func (d *TxRollbackDentry) Marshal() (result []byte, err error) {
217
buff := bytes.NewBuffer(make([]byte, 0, 512))
218
bs, err := d.dentry.Marshal()
222
if err = binary.Write(buff, binary.BigEndian, uint32(len(bs))); err != nil {
225
if _, err := buff.Write(bs); err != nil {
229
log.LogDebugf("TxRollbackDentry Marshal dentry %v", d.dentry)
231
log.LogDebugf("TxRollbackDentry Marshal txDentryInfo %v", d.ToString())
232
bs, err = d.txDentryInfo.Marshal()
236
if err = binary.Write(buff, binary.BigEndian, uint32(len(bs))); err != nil {
239
if _, err := buff.Write(bs); err != nil {
242
if err = binary.Write(buff, binary.BigEndian, &d.rbType); err != nil {
245
return buff.Bytes(), nil
248
func (d *TxRollbackDentry) Unmarshal(raw []byte) (err error) {
249
buff := bytes.NewBuffer(raw)
251
if err = binary.Read(buff, binary.BigEndian, &dataLen); err != nil {
254
log.LogDebugf("TxRollbackDentry Unmarshal len %v", dataLen)
255
data := make([]byte, int(dataLen))
256
if _, err = buff.Read(data); err != nil {
261
if err = dentry.Unmarshal(data); err != nil {
265
log.LogDebugf("TxRollbackDentry Unmarshal dentry %v", dentry)
269
if err = binary.Read(buff, binary.BigEndian, &dataLen); err != nil {
272
data = make([]byte, int(dataLen))
273
if _, err = buff.Read(data); err != nil {
277
txDentryInfo := proto.NewTxDentryInfo("", 0, "", 0)
278
if err = txDentryInfo.Unmarshal(data); err != nil {
281
d.txDentryInfo = txDentryInfo
283
if err = binary.Read(buff, binary.BigEndian, &d.rbType); err != nil {
289
func NewTxRollbackDentry(dentry *Dentry, txDentryInfo *proto.TxDentryInfo, rbType uint8) *TxRollbackDentry {
290
return &TxRollbackDentry{
292
txDentryInfo: txDentryInfo,
298
type TransactionManager struct {
299
// need persistence and sync to all the raft members of the mp
300
txIdAlloc *TxIDAllocator
302
txProcessor *TransactionProcessor
304
opLimiter *rate.Limiter
309
type TransactionResource struct {
310
txRbInodeTree *BTree // key: inode id
311
txRbDentryTree *BTree // key: parentId_name
312
txProcessor *TransactionProcessor
316
type TransactionProcessor struct {
317
txManager *TransactionManager // TM
318
txResource *TransactionResource // RM
323
func (p *TransactionProcessor) Reset() {
328
func (p *TransactionProcessor) Pause() bool {
329
return p.mask == proto.TxPause
332
func NewTransactionManager(txProcessor *TransactionProcessor) *TransactionManager {
333
txMgr := &TransactionManager{
334
txIdAlloc: newTxIDAllocator(),
336
txProcessor: txProcessor,
337
blacklist: util.NewSet(),
338
opLimiter: rate.NewLimiter(rate.Inf, 128),
343
func NewTransactionResource(txProcessor *TransactionProcessor) *TransactionResource {
344
txRsc := &TransactionResource{
345
txRbInodeTree: NewBtree(),
346
txRbDentryTree: NewBtree(),
347
txProcessor: txProcessor,
352
func NewTransactionProcessor(mp *metaPartition) *TransactionProcessor {
353
txProcessor := &TransactionProcessor{
356
txProcessor.txManager = NewTransactionManager(txProcessor)
357
txProcessor.txResource = NewTransactionResource(txProcessor)
359
if mp.config != nil {
360
go txProcessor.txManager.processExpiredTransactions()
365
func (tm *TransactionManager) setLimit(val int) string {
367
tm.opLimiter.SetLimit(rate.Limit(val))
368
return fmt.Sprintf("%v", val)
370
tm.opLimiter.SetLimit(rate.Inf)
374
func (tm *TransactionManager) Reset() {
379
tm.opLimiter.SetLimit(0)
385
func (tm *TransactionManager) processExpiredTransactions() {
386
mpId := tm.txProcessor.mp.config.PartitionId
387
log.LogInfof("processExpiredTransactions for mp[%v] started", mpId)
388
clearInterval := time.Second * 60
389
clearTimer := time.NewTimer(clearInterval)
390
txCheckVal := time.Second * 3
391
txCheckTimer := time.NewTimer(txCheckVal)
394
log.LogWarnf("processExpiredTransactions for mp[%v] exit", mpId)
402
case <-tm.txProcessor.mp.stopC:
403
log.LogDebugf("[processExpiredTransactions] deleteWorker stop partition: %v", mpId)
408
if _, ok := tm.txProcessor.mp.IsLeader(); !ok && !test {
409
log.LogDebugf("processExpiredTransactions: not leader sleep 1s, mp %d", mpId)
410
time.Sleep(time.Second * 10)
415
case <-tm.txProcessor.mp.stopC:
416
log.LogWarnf("processExpiredTransactions for mp[%v] stopped", mpId)
420
clearTimer.Reset(clearInterval)
421
log.LogDebugf("processExpiredTransactions: blacklist cleared, mp %d", mpId)
422
case <-txCheckTimer.C:
423
if tm.txProcessor.Pause() {
424
txCheckTimer.Reset(txCheckVal)
428
txCheckTimer.Reset(txCheckVal)
433
func (tm *TransactionManager) processTx() {
434
mpId := tm.txProcessor.mp.config.PartitionId
436
log.LogDebugf("processTx: mp[%v] mask %v", mpId, proto.GetMaskString(tm.txProcessor.mask))
438
log.LogDebugf("processTx: mp %d total cost %s", mpId, time.Since(start).String())
441
limitCh := make(chan struct{}, 32)
442
var wg sync.WaitGroup
446
limitCh <- struct{}{}
454
f := func(i BtreeItem) bool {
457
if _, ok := tm.txProcessor.mp.IsLeader(); !ok {
458
log.LogWarnf("processExpiredTransactions for mp[%v] already not leader and break tx tree traverse",
459
tm.txProcessor.mp.config.PartitionId)
464
tx := i.(*proto.TransactionInfo)
465
rollbackFunc := func(skipSetStat bool) {
467
status, err := tm.rollbackTx(tx.TxID, skipSetStat)
469
if err != nil || status != proto.OpOk {
470
log.LogWarnf("processExpiredTransactions: transaction (%v) expired, rolling back failed, status(%v), err(%v)",
475
if log.EnableDebug() {
476
log.LogDebugf("processExpiredTransactions: transaction (%v) expired, rolling back done", tx)
480
commitFunc := func() {
482
status, err := tm.commitTx(tx.TxID, true)
483
if err != nil || status != proto.OpOk {
484
log.LogWarnf("processExpiredTransactions: transaction (%v) expired, commit failed, status(%v), err(%v)",
489
if log.EnableDebug() {
490
log.LogDebugf("processExpiredTransactions: transaction (%v) expired, commit done", tx)
496
status, err := tm.delTxFromRM(tx.TxID)
497
if err != nil || status != proto.OpOk {
498
log.LogWarnf("processExpiredTransactions: delTxFromRM (%v) expired, commit failed, status(%v), err(%v)",
502
if log.EnableDebug() {
503
log.LogDebugf("processExpiredTransactions: transaction (%v) delTxFromRM, commit done", tx)
507
clearOrphan := func() {
510
if log.EnableDebug() {
511
log.LogDebugf("processExpiredTransactions: transaction (%v) clearOrphanTx", tx)
515
if tx.TmID != int64(mpId) {
517
if log.EnableDebug() {
518
log.LogDebugf("processExpiredTransactions: transaction (%v) can be deleted", tx)
525
if tx.NeedClearOrphan() {
526
if log.EnableDebug() {
527
log.LogDebugf("processExpiredTransactions: orphan transaction (%v) can be clear", tx)
534
if log.EnableDebug() {
535
log.LogDebugf("processExpiredTransactions: RM transaction (%v) is ongoing", tx)
540
if tx.State == proto.TxStateCommit {
541
if log.EnableDebug() {
542
log.LogDebugf("processExpiredTransactions: transaction (%v) continue to commit...", tx)
549
if tx.State == proto.TxStateRollback {
550
if log.EnableDebug() {
551
log.LogDebugf("processExpiredTransactions: transaction (%v) continue to roll back...", tx)
554
go rollbackFunc(true)
558
if tx.State == proto.TxStatePreCommit {
563
if log.EnableDebug() {
564
log.LogDebugf("processExpiredTransactions: transaction (%v) expired, rolling back...", tx)
567
go rollbackFunc(false)
573
if log.EnableDebug() {
574
log.LogDebugf("processExpiredTransactions: transaction (%v) is ongoing", tx)
579
if log.EnableDebug() {
580
log.LogDebugf("processExpiredTransactions: transaction (%v) can be deleted", tx)
587
log.LogCriticalf("processExpiredTransactions: transaction (%v) is in state failed", tx)
591
tm.txTree.GetTree().Ascend(f)
595
func (tm *TransactionManager) nextTxID() string {
596
id := tm.txIdAlloc.allocateTransactionID()
597
txId := fmt.Sprintf("%d_%d", tm.txProcessor.mp.config.PartitionId, id)
598
log.LogDebugf("nextTxID: txId:%v", txId)
602
func (tm *TransactionManager) txInRMDone(txId string) bool {
603
ifo := tm.getTransaction(txId)
604
if ifo == nil || ifo.Finish() {
605
log.LogWarnf("txInRMDone: tx in rm already done, txId %s, ifo %v", txId, ifo)
611
func (tm *TransactionManager) getTransaction(txID string) (txInfo *proto.TransactionInfo) {
612
txItem := proto.NewTxInfoBItem(txID)
613
item := tm.txTree.Get(txItem)
617
txInfo = item.(*proto.TransactionInfo)
621
func (tm *TransactionManager) copyGetTx(txId string) (txInfo *proto.TransactionInfo) {
622
txItem := proto.NewTxInfoBItem(txId)
623
item := tm.txTree.CopyGet(txItem)
628
txInfo = item.(*proto.TransactionInfo)
632
func (tm *TransactionManager) updateTxIdCursor(txId string) (err error) {
633
arr := strings.Split(txId, "_")
635
return fmt.Errorf("updateTxId: tx[%v] is invalid", txId)
637
id, err := strconv.ParseUint(arr[1], 10, 64)
639
return fmt.Errorf("updateTxId: tx[%v] is invalid", txId)
641
if id > tm.txIdAlloc.getTransactionID() {
642
tm.txIdAlloc.setTransactionID(id)
647
func (tm *TransactionManager) addTxInfo(txInfo *proto.TransactionInfo) {
648
tm.txTree.ReplaceOrInsert(txInfo, true)
651
// TM register a transaction, process client transaction
652
func (tm *TransactionManager) registerTransaction(txInfo *proto.TransactionInfo) (err error) {
653
if uint64(txInfo.TmID) == tm.txProcessor.mp.config.PartitionId {
654
if err := tm.updateTxIdCursor(txInfo.TxID); err != nil {
655
log.LogErrorf("updateTxIdCursor failed, txInfo %s, err %s", txInfo.String(), err.Error())
659
for _, inode := range txInfo.TxInodeInfos {
660
inode.SetCreateTime(txInfo.CreateTime)
661
inode.SetTimeout(txInfo.Timeout)
662
inode.SetTxId(txInfo.TxID)
665
for _, dentry := range txInfo.TxDentryInfos {
666
dentry.SetCreateTime(txInfo.CreateTime)
667
dentry.SetTimeout(txInfo.Timeout)
668
dentry.SetTxId(txInfo.TxID)
672
if info := tm.getTransaction(txInfo.TxID); info != nil {
673
log.LogWarnf("tx is already exist, txId %s, info %v", txInfo.TxID, info.String())
679
if log.EnableDebug() {
680
log.LogDebugf("registerTransaction: txInfo(%v)", txInfo)
686
func (tm *TransactionManager) deleteTxInfo(txId string) (status uint8) {
690
txItem := proto.NewTxInfoBItem(txId)
691
item := tm.txTree.Delete(txItem)
692
if log.EnableDebug() {
693
log.LogDebugf("deleteTxInfo: tx[%v] is deleted, item %v", txId, item)
698
func (tm *TransactionManager) rollbackTxInfo(txId string) (status uint8) {
703
tx := tm.getTransaction(txId)
705
status = proto.OpTxInfoNotExistErr
706
log.LogWarnf("rollbackTxInfo: rollback tx[%v] failed, not found", txId)
710
tx.State = proto.TxStateRollbackDone
711
tx.DoneTime = time.Now().Unix()
712
log.LogDebugf("rollbackTxInfo: tx[%v] is rolled back", tx)
716
func (tm *TransactionManager) commitTxInfo(txId string) (status uint8, err error) {
720
tx := tm.getTransaction(txId)
722
status = proto.OpTxInfoNotExistErr
723
err = fmt.Errorf("commitTxInfo: commit tx[%v] failed, not found", txId)
727
tx.State = proto.TxStateCommitDone
728
tx.DoneTime = time.Now().Unix()
729
log.LogDebugf("commitTxInfo: tx[%v] is committed", tx)
733
func buildTxPacket(data interface{}, mp uint64, op uint8) (pkt *proto.Packet, err error) {
734
pkt = proto.NewPacketReqID()
737
err = pkt.MarshalData(data)
739
errInfo := fmt.Sprintf("buildTxPacket: marshal txInfo [%v] failed", data)
740
err = errors.New(errInfo)
741
log.LogErrorf("%v", errInfo)
748
func (tm *TransactionManager) setTransactionState(txId string, state int32) (status uint8, err error) {
753
stateReq := &proto.TxSetStateRequest{
757
val, _ = json.Marshal(stateReq)
759
resp, err = tm.txProcessor.mp.submit(opFSMTxSetState, val)
761
log.LogWarnf("setTransactionState: set transaction[%v] state to [%v] failed, err[%v]", txId, state, err)
762
return proto.OpAgain, err
764
status = resp.(uint8)
766
if status != proto.OpOk {
767
errInfo := fmt.Sprintf("setTransactionState: set transaction[%v] state to [%v] failed", txId, state)
768
err = errors.New(errInfo)
769
log.LogWarnf("%v", errInfo)
774
func (tm *TransactionManager) delTxFromRM(txId string) (status uint8, err error) {
775
req := proto.TxApplyRequest{
778
val, err := json.Marshal(req)
783
resp, err := tm.txProcessor.mp.submit(opFSMTxDelete, val)
785
log.LogWarnf("delTxFromRM: delTxFromRM transaction[%v] failed, err[%v]", txId, err)
786
return proto.OpAgain, err
789
status = resp.(uint8)
790
if log.EnableDebug() {
791
log.LogDebugf("delTxFromRM: tx[%v] is deleted successfully, status (%s)", txId, proto.GetStatusStr(status))
797
func (tm *TransactionManager) clearOrphanTx(tx *proto.TransactionInfo) {
798
log.LogWarnf("clearOrphanTx: start to clearOrphanTx, tx %v", tx)
799
// check txInfo whether exist in tm
800
req := &proto.TxGetInfoRequest{
801
Pid: uint64(tx.TmID),
805
pkt, err := buildTxPacket(req, req.Pid, proto.OpMetaTxGet)
810
mps := tx.GroupByMp()
811
tmpMp, ok := mps[req.Pid]
813
log.LogErrorf("clearOrphanTx: can't get tm Mp info from tx, tx %v", tx)
817
status := tm.txSendToMpWithAddrs(tmpMp.Members, pkt)
818
if status != proto.OpTxInfoNotExistErr {
819
log.LogWarnf("clearOrphanTx: tx is still exist, tx %v, status %s", tx, proto.GetStatusStr(status))
823
log.LogWarnf("clearOrphanTx: find tx in tm already not exist, start clear it from rm, tx %v", tx)
825
aReq := &proto.TxApplyRMRequest{
826
PartitionID: req.Pid,
830
err = tm.txProcessor.mp.TxRollbackRM(aReq, newPkt)
831
log.LogWarnf("clearOrphanTx: finally rollback tx in rm, tx %v, status %s, err %v",
832
tx, newPkt.GetResultMsg(), err)
836
func (tm *TransactionManager) commitTx(txId string, skipSetStat bool) (status uint8, err error) {
837
tx := tm.getTransaction(txId)
839
status = proto.OpTxInfoNotExistErr
840
log.LogWarnf("commitTx: tx[%v] not found, already success", txId)
844
if tx.State == proto.TxStateCommitDone {
846
log.LogWarnf("commitTx: tx[%v] is already commit", txId)
850
// 1.set transaction to TxStateCommit
851
if !skipSetStat && tx.State != proto.TxStateCommit {
852
status, err = tm.setTransactionState(txId, proto.TxStateCommit)
853
if status != proto.OpOk {
854
log.LogWarnf("commitTx: set transaction[%v] state to TxStateCommit failed", tx)
859
// 2. notify all related RMs that a transaction is completed
860
status = tm.sendToRM(tx, proto.OpTxCommitRM)
861
if status != proto.OpOk {
865
// 3. TM commit the transaction
866
req := proto.TxApplyRequest{
869
val, err := json.Marshal(req)
874
resp, err := tm.txProcessor.mp.submit(opFSMTxCommit, val)
876
log.LogWarnf("commitTx: commit transaction[%v] failed, err[%v]", txId, err)
877
return proto.OpAgain, err
880
status = resp.(uint8)
881
log.LogDebugf("commitTx: tx[%v] is commited successfully", txId)
886
func (tm *TransactionManager) sendToRM(txInfo *proto.TransactionInfo, op uint8) (status uint8) {
888
mpIfos := txInfo.GroupByMp()
889
statusCh := make(chan uint8, len(mpIfos))
890
wg := sync.WaitGroup{}
891
mp := tm.txProcessor.mp
893
for mpId, ifo := range mpIfos {
894
req := &proto.TxApplyRMRequest{
895
VolName: mp.config.VolName,
897
TransactionInfo: txInfo,
902
pkt, _ := buildTxPacket(req, mpId, op)
903
if mp.config.PartitionId == mpId {
908
if op == proto.OpTxCommitRM {
909
err = mp.TxCommitRM(req, pt)
911
err = mp.TxRollbackRM(req, pt)
913
statusCh <- pt.ResultCode
914
if pt.ResultCode != proto.OpOk {
915
log.LogWarnf("sendToRM: invoke TxCommitRM failed, ifo %v, pkt %s, err %v", txInfo, pt.GetResultMsg(), err)
921
members := ifo.Members
924
status := tm.txSendToMpWithAddrs(members, pkt)
925
if status != proto.OpOk {
926
log.LogWarnf("sendToRM: send to rm failed, addr %s, pkt %s, status %s",
927
members, string(pkt.Data), proto.GetStatusStr(status))
936
updateStatus := func(st uint8) uint8 {
937
if st == proto.OpTxConflictErr || st == proto.OpTxInfoNotExistErr {
938
log.LogWarnf("sendToRM: might have already been committed, tx[%v], status (%s)", txInfo, proto.GetStatusStr(st))
940
} else if st == proto.OpTxRbInodeNotExistErr || st == proto.OpTxRbDentryNotExistErr {
941
log.LogWarnf("sendToRM: already done before or not add, tx[%v], status (%s)", txInfo, proto.GetStatusStr(st))
948
for st := range statusCh {
949
t := updateStatus(st)
958
func (tm *TransactionManager) rollbackTx(txId string, skipSetStat bool) (status uint8, err error) {
961
tx := tm.getTransaction(txId)
963
log.LogWarnf("commitTx: tx[%v] not found, already success", txId)
967
if tx.State == proto.TxStateRollbackDone {
969
log.LogWarnf("commitTx: tx[%v] is already rollback", txId)
973
// 1.set transaction to TxStateRollback
974
if !skipSetStat && tx.State != proto.TxStateRollback {
975
status, err = tm.setTransactionState(txId, proto.TxStateRollback)
976
if status != proto.OpOk {
977
log.LogWarnf("commitTransaction: set transaction[%v] state to TxStateCommit failed", tx)
982
// 2. notify all related RMs that a transaction is completed
983
status = tm.sendToRM(tx, proto.OpTxRollbackRM)
984
if status != proto.OpOk {
988
req := proto.TxApplyRequest{
991
val, err := json.Marshal(req)
996
resp, err := tm.txProcessor.mp.submit(opFSMTxRollback, val)
998
log.LogWarnf("commitTx: rollback transaction[%v] failed, err[%v]", txId, err)
999
return proto.OpAgain, err
1002
status = resp.(uint8)
1003
log.LogDebugf("commitTx: tx[%v] is rollback successfully, msg %s", txId, proto.GetStatusStr(status))
1008
func (tm *TransactionManager) sendPacketToMP(addr string, p *proto.Packet) (err error) {
1015
connPool := tm.txProcessor.mp.manager.connPool
1017
connPool.PutConnect(mConn, err != nil)
1019
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
1020
log.LogErrorf("[sendPacketToMP]: req: %d - %v, %v, packet(%v)", p.GetReqID(),
1021
p.GetOpMsg(), err, p)
1026
mConn, err = connPool.GetConnect(addr)
1031
if err = p.WriteToConn(mConn); err != nil {
1035
// read connection from the master
1036
if err = p.ReadFromConn(mConn, proto.ReadDeadlineTime); err != nil {
1040
if reqID != p.ReqID || reqOp != p.Opcode {
1041
err = fmt.Errorf("sendPacketToMP: send and received packet mismatch: req(%v_%v) resp(%v_%v)",
1042
reqID, reqOp, p.ReqID, p.Opcode)
1046
if log.EnableDebug() {
1047
log.LogDebugf("[sendPacketToMP] req: %d - %v, resp: %v, packet(%v)", p.GetReqID(), p.GetOpMsg(),
1048
p.GetResultMsg(), p)
1054
func (tm *TransactionManager) txSendToMpWithAddrs(addrStr string, p *proto.Packet) (status uint8) {
1055
addrs := strings.Split(addrStr, ",")
1058
skippedAddrs := make([]string, 0)
1059
for _, addr := range addrs {
1060
if tm.blacklist.Has(addr) {
1061
log.LogWarnf("txSendToMpWithAddrs: addr[%v] is already blacklisted, retry another addr, p %s", addr, string(p.Data))
1062
skippedAddrs = append(skippedAddrs, addr)
1066
newPkt := p.GetCopy()
1067
err = tm.sendPacketToMP(addr, newPkt)
1069
tm.blacklist.Add(addr)
1070
log.LogWarnf("txSendToMpWithAddrs: send to %v failed, err(%s), add to blacklist and retry another addr, p %s",
1071
addr, err.Error(), string(p.Data))
1075
status := newPkt.ResultCode
1076
if status == proto.OpErr || status == proto.OpAgain {
1077
log.LogWarnf("txSendToMpWithAddrs: sendPacketToMp failed, addr %s, msg %s, data %s, status(%s)",
1078
addr, newPkt.GetResultMsg(), string(p.Data), proto.GetStatusStr(status))
1082
if status == proto.OpOk {
1083
if log.EnableDebug() {
1084
log.LogDebugf("txSendToMpWithAddrs: send to %v done with status[%v], tx[%s]",
1085
addr, status, string(p.Data))
1091
log.LogWarnf("txSendToMpWithAddrs: sendPacketToMp failed, addr %s, msg %s, data %s, status %s",
1092
addr, newPkt.GetResultMsg(), string(p.Data), proto.GetStatusStr(status))
1096
// try use skipped addr
1097
for _, addr := range skippedAddrs {
1098
newPkt := p.GetCopy()
1099
err = tm.sendPacketToMP(addr, newPkt)
1101
log.LogWarnf("txSendToMpWithAddrs: send to %v failed, err(%s), add to blacklist and retry another addr, p %s",
1102
addr, err.Error(), string(p.Data))
1106
status := newPkt.ResultCode
1107
if status == proto.OpErr || status == proto.OpAgain {
1108
log.LogWarnf("txSendToMpWithAddrs: sendPacketToMp failed, addr %s, msg %s, data %s, status(%s)",
1109
addr, newPkt.GetResultMsg(), string(p.Data), proto.GetStatusStr(status))
1113
if status == proto.OpOk {
1114
if log.EnableDebug() {
1115
log.LogDebugf("txSendToMpWithAddrs: send to %v done with status[%v], tx[%s]",
1116
addr, status, string(p.Data))
1122
log.LogWarnf("txSendToMpWithAddrs: sendPacketToMp failed, addr %s, msg %s, data %s, status %s",
1123
addr, newPkt.GetResultMsg(), string(p.Data), proto.GetStatusStr(status))
1127
log.LogWarnf("txSendToMpWithAddrs: after retry still failed, return opAgain, pkt %s, addrs %v, err %v, status %s",
1128
string(p.Data), addrs, err, proto.GetStatusStr(status))
1129
return proto.OpAgain
1132
func (tm *TransactionManager) txSetState(req *proto.TxSetStateRequest) (status uint8, err error) {
1137
txItem := proto.NewTxInfoBItem(req.TxID)
1138
item := tm.txTree.CopyGet(txItem)
1140
status = proto.OpTxInfoNotExistErr
1141
errInfo := fmt.Sprintf("txSetState: set state failed, req[%v] tx not existed", req)
1142
err = errors.New(errInfo)
1143
log.LogErrorf("%v", errInfo)
1146
txInfo := item.(*proto.TransactionInfo)
1148
if req.State == proto.TxStateCommit && txInfo.State == proto.TxStateCommitDone {
1149
log.LogWarnf("txSetState: tx is already success before set commit state, tx %v", txInfo)
1154
if req.State < proto.TxStateCommit || req.State > proto.TxStateFailed {
1155
status = proto.OpTxSetStateErr
1156
errInfo := fmt.Sprintf("txSetState: set state failed, wrong state, req[%v]", req)
1157
err = errors.New(errInfo)
1158
log.LogErrorf("%v", errInfo)
1162
if req.State == proto.TxStateCommit && txInfo.State != proto.TxStateCommit && txInfo.State != proto.TxStatePreCommit {
1163
status = proto.OpTxSetStateErr
1164
errInfo := fmt.Sprintf("txSetState: set state failed, wrong state, tx state[%v], req state[%v], tx[%v]",
1165
txInfo.State, req.State, req.TxID)
1166
err = errors.New(errInfo)
1167
log.LogErrorf("%v", errInfo)
1171
if req.State == proto.TxStateRollback && txInfo.State != proto.TxStateRollback && txInfo.State != proto.TxStatePreCommit {
1172
status = proto.OpTxSetStateErr
1173
errInfo := fmt.Sprintf("txSetState: set state failed, wrong state, tx state[%v], req state[%v], tx[%v]",
1174
txInfo.State, req.State, req.TxID)
1175
err = errors.New(errInfo)
1176
log.LogErrorf("%v", errInfo)
1180
log.LogDebugf("txSetState: set tx state from [%v] to [%v], tx[%v]", txInfo.State, req.State, req.TxID)
1181
txInfo.State = req.State
1185
func (tr *TransactionResource) Reset() {
1188
tr.txRbInodeTree.Reset()
1189
tr.txRbDentryTree.Reset()
1190
tr.txProcessor = nil
1193
// check if item(inode, dentry) is in transaction for modifying
1194
func (tr *TransactionResource) isInodeInTransction(ino *Inode) (inTx bool, txID string) {
1195
// return true only if specified inode is in an ongoing transaction(not expired yet)
1199
if rbInode := tr.getTxRbInode(ino.Inode); rbInode != nil {
1201
if rbInode.txInodeInfo != nil {
1202
txID = rbInode.txInodeInfo.TxID
1209
func (tr *TransactionResource) isDentryInTransction(dentry *Dentry) (inTx bool, txID string) {
1213
if rbDentry := tr.getTxRbDentry(dentry.ParentId, dentry.Name); rbDentry != nil {
1215
if rbDentry.txDentryInfo != nil {
1216
txID = rbDentry.txDentryInfo.TxID
1223
func (tr *TransactionResource) getTxRbInode(ino uint64) (rbInode *TxRollbackInode) {
1224
keyNode := &TxRollbackInode{
1225
inode: NewInode(ino, 0),
1227
item := tr.txRbInodeTree.Get(keyNode)
1231
rbInode = item.(*TxRollbackInode)
1235
func (tr *TransactionResource) copyGetTxRbInode(ino uint64) (rbInode *TxRollbackInode) {
1236
keyNode := &TxRollbackInode{
1237
inode: NewInode(ino, 0),
1239
item := tr.txRbInodeTree.CopyGet(keyNode)
1243
rbInode = item.(*TxRollbackInode)
1247
func (tr *TransactionResource) deleteTxRollbackInode(ino uint64, txId string) (status uint8) {
1251
keyNode := &TxRollbackInode{
1252
txInodeInfo: proto.NewTxInodeInfo("", ino, 0),
1255
item := tr.txRbInodeTree.Get(keyNode)
1257
log.LogWarnf("deleteTxRollbackInode: rollback inode may be already been deleted, inode %d, txId %s",
1259
return proto.OpTxRbInodeNotExistErr
1262
if item.(*TxRollbackInode).txInodeInfo.TxID != txId {
1263
log.LogWarnf("deleteTxRollbackInode: rollback dentry is already been update by other, txId %s, item %v",
1265
return proto.OpTxRbDentryNotExistErr
1268
tr.txRbInodeTree.Delete(item)
1272
// RM add an `TxRollbackInode` into `txRollbackInodes`
1273
func (tr *TransactionResource) addTxRollbackInode(rbInode *TxRollbackInode) (status uint8) {
1277
oldRbInode := tr.getTxRbInode(rbInode.inode.Inode)
1278
if oldRbInode != nil {
1279
if oldRbInode.txInodeInfo.TxID == rbInode.txInodeInfo.TxID {
1280
log.LogWarnf("addTxRollbackInode: rollback inode [ino(%v) txID(%v)] is already exists",
1281
rbInode.inode.Inode, rbInode.txInodeInfo.TxID)
1282
return proto.OpExistErr
1284
log.LogErrorf("addTxRollbackInode: rollback inode [ino(%v) txID(%v)] "+
1285
"is conflicted with inode [ino(%v) txID(%v)]",
1286
rbInode.inode.Inode, rbInode.txInodeInfo.TxID, oldRbInode.inode.Inode, oldRbInode.txInodeInfo.TxID)
1287
return proto.OpTxConflictErr
1291
tr.txRbInodeTree.ReplaceOrInsert(rbInode, true)
1292
log.LogDebugf("addTxRollbackInode: rollback inode [ino(%v) txID(%v)] is added", rbInode.inode.Inode, rbInode.txInodeInfo.TxID)
1296
func (tr *TransactionResource) getTxRbDentry(pId uint64, name string) *TxRollbackDentry {
1297
keyNode := &TxRollbackDentry{
1298
txDentryInfo: proto.NewTxDentryInfo("", pId, name, 0),
1300
item := tr.txRbDentryTree.Get(keyNode)
1305
return item.(*TxRollbackDentry)
1308
func (tr *TransactionResource) deleteTxRollbackDentry(pid uint64, name, txId string) (status uint8) {
1312
keyNode := &TxRollbackDentry{
1313
txDentryInfo: proto.NewTxDentryInfo("", pid, name, 0),
1316
item := tr.txRbDentryTree.Get(keyNode)
1318
log.LogWarnf("deleteTxRollbackDentry: rollback dentry may be already been deleted, pid %d, name %s, txId %s",
1320
return proto.OpTxRbDentryNotExistErr
1323
if item.(*TxRollbackDentry).txDentryInfo.TxID != txId {
1324
log.LogWarnf("deleteTxRollbackDentry: rollback dentry is already been update by other, txId %s, item %v",
1326
return proto.OpTxRbDentryNotExistErr
1329
tr.txRbDentryTree.Delete(item)
1333
// RM add a `TxRollbackDentry` into `txRollbackDentries`
1334
func (tr *TransactionResource) addTxRollbackDentry(rbDentry *TxRollbackDentry) (status uint8) {
1338
oldRbDentry := tr.getTxRbDentry(rbDentry.txDentryInfo.ParentId, rbDentry.dentry.Name)
1339
if oldRbDentry != nil {
1340
if oldRbDentry.txDentryInfo.TxID == rbDentry.txDentryInfo.TxID {
1341
log.LogWarnf("addTxRollbackDentry: rollback dentry [pino(%v) name(%v) txID(%v)] is already exists",
1342
rbDentry.dentry.ParentId, rbDentry.dentry.Name, rbDentry.txDentryInfo.TxID)
1343
return proto.OpExistErr
1345
log.LogWarnf("addTxRollbackDentry: rollback dentry [pino(%v) name(%v) txID(%v) rbType(%v)] "+
1346
"is conflicted with dentry [pino(%v) name(%v) txID(%v) rbType(%v)]",
1347
rbDentry.dentry.ParentId, rbDentry.dentry.Name, rbDentry.txDentryInfo.TxID, rbDentry.rbType,
1348
oldRbDentry.dentry.ParentId, oldRbDentry.dentry.Name, oldRbDentry.txDentryInfo.TxID, oldRbDentry.rbType)
1349
return proto.OpTxConflictErr
1352
tr.txRbDentryTree.ReplaceOrInsert(rbDentry, true)
1353
log.LogDebugf("addTxRollbackDentry: rollback dentry [pino(%v) name(%v) txID(%v) rbType(%v)] is added",
1354
rbDentry.dentry.ParentId, rbDentry.dentry.Name, rbDentry.txDentryInfo.TxID, rbDentry.rbType)
1358
func (tr *TransactionResource) rollbackInodeInternal(rbInode *TxRollbackInode) (status uint8, err error) {
1360
mp := tr.txProcessor.mp
1361
switch rbInode.rbType {
1364
item := mp.inodeTree.CopyGet(rbInode.inode)
1369
if item == nil || ino.IsTempFile() || ino.ShouldDelete() {
1370
mp.freeList.Remove(rbInode.inode.Inode)
1371
if mp.uidManager != nil {
1372
mp.uidManager.addUidSpace(rbInode.inode.Uid, rbInode.inode.Inode, rbInode.inode.Extents.eks)
1374
if mp.mqMgr != nil && len(rbInode.quotaIds) > 0 && item == nil {
1375
mp.setInodeQuota(rbInode.quotaIds, rbInode.inode.Inode)
1376
for _, quotaId := range rbInode.quotaIds {
1377
mp.mqMgr.updateUsedInfo(int64(rbInode.inode.Size), 1, quotaId)
1380
mp.inodeTree.ReplaceOrInsert(rbInode.inode, true)
1382
ino.IncNLink(mp.verSeq)
1386
if rsp := tr.txProcessor.mp.getInode(rbInode.inode, false); rsp.Status == proto.OpOk {
1387
if tr.txProcessor.mp.uidManager != nil {
1388
tr.txProcessor.mp.uidManager.doMinusUidSpace(rbInode.inode.Uid, rbInode.inode.Inode, rbInode.inode.Size)
1391
if tr.txProcessor.mp.mqMgr != nil && len(rbInode.quotaIds) > 0 {
1392
for _, quotaId := range rbInode.quotaIds {
1393
tr.txProcessor.mp.mqMgr.updateUsedInfo(-1*int64(rbInode.inode.Size), -1, quotaId)
1396
tr.txProcessor.mp.fsmUnlinkInode(rbInode.inode, 0)
1397
tr.txProcessor.mp.fsmEvictInode(rbInode.inode)
1401
status = proto.OpTxRollbackUnknownRbType
1402
err = fmt.Errorf("rollbackInode: unknown rbType %d", rbInode.rbType)
1405
tr.txRbInodeTree.Delete(rbInode)
1409
// RM roll back an inode, retry if error occours
1410
func (tr *TransactionResource) rollbackInode(req *proto.TxInodeApplyRequest) (status uint8, err error) {
1414
rbInode := tr.getTxRbInode(req.Inode)
1416
status = proto.OpTxRbInodeNotExistErr
1417
errInfo := fmt.Sprintf("rollbackInode: roll back inode[%v] failed, txID[%v], rb inode not found", req.Inode, req.TxID)
1418
err = errors.New(errInfo)
1419
log.LogErrorf("%v", errInfo)
1423
if rbInode.txInodeInfo.TxID != req.TxID {
1424
status = proto.OpTxConflictErr
1425
errInfo := fmt.Sprintf("rollbackInode: txID %v is not matching txInodeInfo txID %v", req.TxID, rbInode.txInodeInfo.TxID)
1426
err = errors.New(errInfo)
1427
log.LogErrorf("%v", errInfo)
1431
status, err = tr.rollbackInodeInternal(rbInode)
1433
log.LogErrorf("rollbackInode: inode[%v] roll back failed in tx[%v], rbType[%v]", req.Inode, req.TxID, rbInode.rbType)
1435
log.LogDebugf("rollbackInode: inode[%v] is rolled back in tx[%v], rbType[%v]", req.Inode, req.TxID, rbInode.rbType)
1441
func (tr *TransactionResource) rollbackDentryInternal(rbDentry *TxRollbackDentry) (status uint8, err error) {
1443
if status != proto.OpOk {
1444
log.LogErrorf("rollbackDentryInternal: rollback dentry failed, ifo %v", rbDentry.txDentryInfo)
1448
switch rbDentry.rbType {
1450
// need to be true to assert link not change.
1451
status = tr.txProcessor.mp.fsmCreateDentry(rbDentry.dentry, true)
1453
resp := tr.txProcessor.mp.fsmDeleteDentry(rbDentry.dentry, true)
1454
status = resp.Status
1456
resp := tr.txProcessor.mp.fsmUpdateDentry(rbDentry.dentry)
1457
status = resp.Status
1459
status = proto.OpTxRollbackUnknownRbType
1460
err = fmt.Errorf("rollbackDentry: unknown rbType %d", rbDentry.rbType)
1464
tr.txRbDentryTree.Delete(rbDentry)
1468
// RM roll back a dentry, retry if error occours
1469
func (tr *TransactionResource) rollbackDentry(req *proto.TxDentryApplyRequest) (status uint8, err error) {
1473
rbDentry := tr.getTxRbDentry(req.Pid, req.Name)
1474
if rbDentry == nil {
1475
status = proto.OpTxRbDentryNotExistErr
1476
errInfo := fmt.Sprintf("rollbackDentry: roll back dentry[%v_%v] failed, rb inode not found, txID[%v]",
1477
req.Pid, req.Name, req.TxID)
1478
err = errors.New(errInfo)
1479
log.LogWarnf("%v", errInfo)
1483
if rbDentry.txDentryInfo.TxID != req.TxID {
1484
status = proto.OpTxConflictErr
1485
errInfo := fmt.Sprintf("rollbackDentry: txID %v is not matching txInodeInfo txID %v", req.TxID, rbDentry.txDentryInfo.TxID)
1486
err = errors.New(errInfo)
1487
log.LogWarnf("%v", errInfo)
1491
status, err = tr.rollbackDentryInternal(rbDentry)
1493
log.LogErrorf("rollbackDentry: denKey[%v] roll back failed in tx[%v], rbType[%v]",
1494
rbDentry.txDentryInfo.GetKey(), req.TxID, rbDentry.rbType)
1496
log.LogDebugf("rollbackDentry: denKey[%v] is rolled back in tx[%v], rbType[%v]",
1497
rbDentry.txDentryInfo.GetKey(), req.TxID, rbDentry.rbType)
1503
// RM simplely remove the inode from TransactionResource
1504
func (tr *TransactionResource) commitInode(txID string, inode uint64) (status uint8, err error) {
1508
rbInode := tr.getTxRbInode(inode)
1510
status = proto.OpTxRbInodeNotExistErr
1511
errInfo := fmt.Sprintf("commitInode: commit inode[%v] failed, rb inode not found", inode)
1512
err = errors.New(errInfo)
1513
log.LogWarnf("%v", errInfo)
1517
if rbInode.txInodeInfo.TxID != txID {
1518
status = proto.OpTxConflictErr
1519
errInfo := fmt.Sprintf("commitInode: txID %v is not matching txInodeInfo txID %v", txID, rbInode.txInodeInfo.TxID)
1520
err = errors.New(errInfo)
1521
log.LogErrorf("%v", errInfo)
1525
tr.txRbInodeTree.Delete(rbInode)
1526
log.LogDebugf("commitInode: inode[%v] is committed", inode)
1530
// RM simplely remove the dentry from TransactionResource
1531
func (tr *TransactionResource) commitDentry(txID string, pId uint64, name string) (status uint8, err error) {
1536
rbDentry := tr.getTxRbDentry(pId, name)
1537
if rbDentry == nil {
1538
status = proto.OpTxRbDentryNotExistErr
1539
errInfo := fmt.Sprintf("commitDentry: commit dentry[%v_%v] failed, rb dentry not found", pId, name)
1540
err = errors.New(errInfo)
1541
log.LogWarnf("%v", errInfo)
1545
if rbDentry.txDentryInfo.TxID != txID {
1546
status = proto.OpTxConflictErr
1547
errInfo := fmt.Sprintf("commitDentry: txID %v is not matching txDentryInfo txID %v", txID, rbDentry.txDentryInfo.TxID)
1548
err = errors.New(errInfo)
1549
log.LogWarnf("%v", errInfo)
1553
tr.txRbDentryTree.Delete(rbDentry)
1554
// unlink parent inode
1555
if rbDentry.rbType == TxAdd {
1556
parInode := NewInode(pId, 0)
1557
st := tr.txProcessor.mp.fsmUnlinkInode(parInode, 0)
1558
if st.Status != proto.OpOk {
1559
log.LogWarnf("commitDentry: try unlink parent inode failed, txId %s, inode[%v]", txID, parInode)
1564
log.LogDebugf("commitDentry: dentry[%v] is committed", rbDentry.txDentryInfo.GetKey())