cubefs

Форк
0
/
partition_fsmop_transaction.go 
179 строк · 4.8 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.k
14

15
package metanode
16

17
import (
18
	"github.com/cubefs/cubefs/proto"
19
	"github.com/cubefs/cubefs/util/log"
20
)
21

22
func (mp *metaPartition) fsmTxRollback(txID string) (status uint8) {
23
	status = mp.txProcessor.txManager.rollbackTxInfo(txID)
24
	return
25
}
26

27
func (mp *metaPartition) fsmTxDelete(txID string) (status uint8) {
28
	status = mp.txProcessor.txManager.deleteTxInfo(txID)
29
	return
30
}
31

32
func (mp *metaPartition) fsmTxInodeRollback(req *proto.TxInodeApplyRequest) (status uint8) {
33
	status, _ = mp.txProcessor.txResource.rollbackInode(req)
34
	return
35
}
36

37
func (mp *metaPartition) fsmTxDentryRollback(req *proto.TxDentryApplyRequest) (status uint8) {
38
	status, _ = mp.txProcessor.txResource.rollbackDentry(req)
39
	return
40
}
41

42
func (mp *metaPartition) fsmTxSetState(req *proto.TxSetStateRequest) (status uint8) {
43
	status, _ = mp.txProcessor.txManager.txSetState(req)
44
	return
45
}
46

47
func (mp *metaPartition) fsmTxInit(txInfo *proto.TransactionInfo) (status uint8) {
48
	status = proto.OpOk
49
	err := mp.txProcessor.txManager.registerTransaction(txInfo)
50
	if err != nil {
51
		log.LogErrorf("fsmTxInit: register transaction failed, txInfo %s, err %s", txInfo.String(), err.Error())
52
		return proto.OpTxInternalErr
53
	}
54
	return
55
}
56

57
func (mp *metaPartition) fsmTxCommit(txID string) (status uint8) {
58
	status, _ = mp.txProcessor.txManager.commitTxInfo(txID)
59
	return
60
}
61

62
func (mp *metaPartition) fsmTxInodeCommit(txID string, inode uint64) (status uint8) {
63
	// var err error
64
	status, _ = mp.txProcessor.txResource.commitInode(txID, inode)
65
	return
66
}
67

68
func (mp *metaPartition) fsmTxDentryCommit(txID string, pId uint64, name string) (status uint8) {
69
	// var err error
70
	status, _ = mp.txProcessor.txResource.commitDentry(txID, pId, name)
71
	return
72
}
73

74
func (mp *metaPartition) fsmTxCommitRM(txInfo *proto.TransactionInfo) (status uint8) {
75
	status = proto.OpOk
76
	ifo := mp.txProcessor.txManager.copyGetTx(txInfo.TxID)
77
	if ifo == nil || ifo.Finish() {
78
		log.LogWarnf("fsmTxCommitRM: tx already commit or rollback before, tx %v, ifo %v", txInfo, ifo)
79
		return
80
	}
81

82
	mpId := mp.config.PartitionId
83
	for _, ifo := range txInfo.TxInodeInfos {
84
		if ifo.MpID != mpId {
85
			continue
86
		}
87

88
		mp.fsmTxInodeCommit(ifo.TxID, ifo.Ino)
89
	}
90

91
	for _, ifo := range txInfo.TxDentryInfos {
92
		if ifo.MpID != mpId {
93
			continue
94
		}
95

96
		mp.fsmTxDentryCommit(ifo.TxID, ifo.ParentId, ifo.Name)
97
	}
98

99
	ifo.SetFinish()
100
	return proto.OpOk
101
}
102

103
func (mp *metaPartition) fsmTxRollbackRM(txInfo *proto.TransactionInfo) (status uint8) {
104
	status = proto.OpOk
105
	ifo := mp.txProcessor.txManager.copyGetTx(txInfo.TxID)
106
	if ifo == nil || ifo.Finish() {
107
		log.LogWarnf("fsmTxRollbackRM: tx already commit or rollback before, tx %v, ifo %v", txInfo, ifo)
108
		return
109
	}
110

111
	mpId := mp.config.PartitionId
112
	for _, ifo := range txInfo.TxInodeInfos {
113
		if ifo.MpID != mpId {
114
			continue
115
		}
116

117
		req := &proto.TxInodeApplyRequest{
118
			TxID:  ifo.TxID,
119
			Inode: ifo.Ino,
120
		}
121
		mp.fsmTxInodeRollback(req)
122
	}
123

124
	// delete from rb tree
125
	for _, ifo := range txInfo.TxDentryInfos {
126
		if ifo.MpID != mpId {
127
			continue
128
		}
129

130
		req := &proto.TxDentryApplyRequest{
131
			TxID: ifo.TxID,
132
			Pid:  ifo.ParentId,
133
			Name: ifo.Name,
134
		}
135
		mp.fsmTxDentryRollback(req)
136
	}
137

138
	ifo.SetFinish()
139
	return proto.OpOk
140
}
141

142
func (mp *metaPartition) inodeInTx(inode uint64) uint8 {
143
	inTx, txId := mp.txProcessor.txResource.isInodeInTransction(NewInode(inode, 0))
144
	if inTx {
145
		log.LogWarnf("inodeInTx: inode is in transaction, inode %d, txId %s", inode, txId)
146
		return proto.OpTxConflictErr
147
	}
148
	return proto.OpOk
149
}
150

151
func (mp *metaPartition) dentryInTx(parIno uint64, name string) uint8 {
152
	inTx, txId := mp.txProcessor.txResource.isDentryInTransction(&Dentry{
153
		ParentId: parIno,
154
		Name:     name,
155
	})
156

157
	if inTx {
158
		log.LogWarnf("inodeInTx: inode is in transaction, parent inode %d, name %s, txId %s", parIno, name, txId)
159
		return proto.OpTxConflictErr
160
	}
161
	return proto.OpOk
162
}
163

164
func (mp *metaPartition) txInodeInRb(inode uint64, newTxId string) (rbInode *TxRollbackInode) {
165
	rbIno := mp.txProcessor.txResource.getTxRbInode(inode)
166
	if rbIno != nil && rbIno.txInodeInfo.TxID == newTxId {
167
		return rbIno
168
	}
169

170
	return nil
171
}
172

173
func (mp *metaPartition) txDentryInRb(parIno uint64, name, newTxId string) bool {
174
	inTx, txId := mp.txProcessor.txResource.isDentryInTransction(&Dentry{
175
		ParentId: parIno,
176
		Name:     name,
177
	})
178
	return inTx && txId == newTxId
179
}
180

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.