1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.k
18
"github.com/cubefs/cubefs/proto"
19
"github.com/cubefs/cubefs/util/log"
22
func (mp *metaPartition) fsmTxRollback(txID string) (status uint8) {
23
status = mp.txProcessor.txManager.rollbackTxInfo(txID)
27
func (mp *metaPartition) fsmTxDelete(txID string) (status uint8) {
28
status = mp.txProcessor.txManager.deleteTxInfo(txID)
32
func (mp *metaPartition) fsmTxInodeRollback(req *proto.TxInodeApplyRequest) (status uint8) {
33
status, _ = mp.txProcessor.txResource.rollbackInode(req)
37
func (mp *metaPartition) fsmTxDentryRollback(req *proto.TxDentryApplyRequest) (status uint8) {
38
status, _ = mp.txProcessor.txResource.rollbackDentry(req)
42
func (mp *metaPartition) fsmTxSetState(req *proto.TxSetStateRequest) (status uint8) {
43
status, _ = mp.txProcessor.txManager.txSetState(req)
47
func (mp *metaPartition) fsmTxInit(txInfo *proto.TransactionInfo) (status uint8) {
49
err := mp.txProcessor.txManager.registerTransaction(txInfo)
51
log.LogErrorf("fsmTxInit: register transaction failed, txInfo %s, err %s", txInfo.String(), err.Error())
52
return proto.OpTxInternalErr
57
func (mp *metaPartition) fsmTxCommit(txID string) (status uint8) {
58
status, _ = mp.txProcessor.txManager.commitTxInfo(txID)
62
func (mp *metaPartition) fsmTxInodeCommit(txID string, inode uint64) (status uint8) {
64
status, _ = mp.txProcessor.txResource.commitInode(txID, inode)
68
func (mp *metaPartition) fsmTxDentryCommit(txID string, pId uint64, name string) (status uint8) {
70
status, _ = mp.txProcessor.txResource.commitDentry(txID, pId, name)
74
func (mp *metaPartition) fsmTxCommitRM(txInfo *proto.TransactionInfo) (status uint8) {
76
ifo := mp.txProcessor.txManager.copyGetTx(txInfo.TxID)
77
if ifo == nil || ifo.Finish() {
78
log.LogWarnf("fsmTxCommitRM: tx already commit or rollback before, tx %v, ifo %v", txInfo, ifo)
82
mpId := mp.config.PartitionId
83
for _, ifo := range txInfo.TxInodeInfos {
88
mp.fsmTxInodeCommit(ifo.TxID, ifo.Ino)
91
for _, ifo := range txInfo.TxDentryInfos {
96
mp.fsmTxDentryCommit(ifo.TxID, ifo.ParentId, ifo.Name)
103
func (mp *metaPartition) fsmTxRollbackRM(txInfo *proto.TransactionInfo) (status uint8) {
105
ifo := mp.txProcessor.txManager.copyGetTx(txInfo.TxID)
106
if ifo == nil || ifo.Finish() {
107
log.LogWarnf("fsmTxRollbackRM: tx already commit or rollback before, tx %v, ifo %v", txInfo, ifo)
111
mpId := mp.config.PartitionId
112
for _, ifo := range txInfo.TxInodeInfos {
113
if ifo.MpID != mpId {
117
req := &proto.TxInodeApplyRequest{
121
mp.fsmTxInodeRollback(req)
124
// delete from rb tree
125
for _, ifo := range txInfo.TxDentryInfos {
126
if ifo.MpID != mpId {
130
req := &proto.TxDentryApplyRequest{
135
mp.fsmTxDentryRollback(req)
142
func (mp *metaPartition) inodeInTx(inode uint64) uint8 {
143
inTx, txId := mp.txProcessor.txResource.isInodeInTransction(NewInode(inode, 0))
145
log.LogWarnf("inodeInTx: inode is in transaction, inode %d, txId %s", inode, txId)
146
return proto.OpTxConflictErr
151
func (mp *metaPartition) dentryInTx(parIno uint64, name string) uint8 {
152
inTx, txId := mp.txProcessor.txResource.isDentryInTransction(&Dentry{
158
log.LogWarnf("inodeInTx: inode is in transaction, parent inode %d, name %s, txId %s", parIno, name, txId)
159
return proto.OpTxConflictErr
164
func (mp *metaPartition) txInodeInRb(inode uint64, newTxId string) (rbInode *TxRollbackInode) {
165
rbIno := mp.txProcessor.txResource.getTxRbInode(inode)
166
if rbIno != nil && rbIno.txInodeInfo.TxID == newTxId {
173
func (mp *metaPartition) txDentryInRb(parIno uint64, name, newTxId string) bool {
174
inTx, txId := mp.txProcessor.txResource.isDentryInTransction(&Dentry{
178
return inTx && txId == newTxId