1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
25
"github.com/cubefs/cubefs/depends/tiglabs/raft"
26
"github.com/cubefs/cubefs/depends/tiglabs/raft/proto"
27
"github.com/cubefs/cubefs/raftstore/raftstore_db"
28
"github.com/cubefs/cubefs/util/keystore"
29
"github.com/cubefs/cubefs/util/log"
36
type raftLeaderChangeHandler func(leader uint64)
38
type raftPeerChangeHandler func(confChange *proto.ConfChange) (err error)
40
type raftCmdApplyHandler func(cmd *RaftCmd) (err error)
42
type raftApplySnapshotHandler func()
44
// KeystoreFsm represents the finite state machine of a keystore
45
type KeystoreFsm struct {
46
store *raftstore_db.RocksDBStore
50
leaderChangeHandler raftLeaderChangeHandler
51
peerChangeHandler raftPeerChangeHandler
52
snapshotHandler raftApplySnapshotHandler
54
keystore map[string]*keystore.KeyInfo
55
accessKeystore map[string]*keystore.AccessKeyInfo
56
ksMutex sync.RWMutex // keystore mutex
57
aksMutex sync.RWMutex // accesskeystore mutex
58
opKeyMutex sync.RWMutex // operations on key mutex
59
id uint64 // current id of server
62
func newKeystoreFsm(store *raftstore_db.RocksDBStore, retainsLog uint64, rs *raft.RaftServer) (fsm *KeystoreFsm) {
63
fsm = new(KeystoreFsm)
66
fsm.retainLogs = retainsLog
70
// Corresponding to the LeaderChange interface in Raft library.
71
func (mf *KeystoreFsm) registerLeaderChangeHandler(handler raftLeaderChangeHandler) {
72
mf.leaderChangeHandler = handler
75
// Corresponding to the PeerChange interface in Raft library.
76
func (mf *KeystoreFsm) registerPeerChangeHandler(handler raftPeerChangeHandler) {
77
mf.peerChangeHandler = handler
80
// Corresponding to the ApplySnapshot interface in Raft library.
81
func (mf *KeystoreFsm) registerApplySnapshotHandler(handler raftApplySnapshotHandler) {
82
mf.snapshotHandler = handler
85
func (mf *KeystoreFsm) restore() {
89
func (mf *KeystoreFsm) restoreApplied() {
90
value, err := mf.Get(applied)
92
panic(fmt.Sprintf("Failed to restore applied err:%v", err.Error()))
94
byteValues := value.([]byte)
95
if len(byteValues) == 0 {
99
applied, err := strconv.ParseUint(string(byteValues), 10, 64)
101
panic(fmt.Sprintf("Failed to restore applied,err:%v ", err.Error()))
106
// Apply implements the interface of raft.StateMachine
107
func (mf *KeystoreFsm) Apply(command []byte, index uint64) (resp interface{}, err error) {
109
keyInfo keystore.KeyInfo
114
if err = cmd.Unmarshal(command); err != nil {
115
log.LogErrorf("action[fsmApply],unmarshal data:%v, err:%v", command, err.Error())
118
log.LogInfof("action[fsmApply],cmd.op[%v],cmd.K[%v],cmd.V[%v]", cmd.Op, cmd.K, string(cmd.V))
120
if err = json.Unmarshal(cmd.V, &keyInfo); err != nil {
124
s := strings.Split(cmd.K, idSeparator)
126
panic(fmt.Errorf("cmd.K format error %s", cmd.K))
128
leader, err = strconv.ParseUint(s[1], 10, 64)
130
panic(fmt.Errorf("leaderID format error %s", s[1]))
133
cmdMap := make(map[string][]byte)
135
cmdMap[applied] = []byte(strconv.FormatUint(uint64(index), 10))
138
case opSyncDeleteKey:
139
if err = mf.delKeyAndPutIndex(cmd.K, cmdMap); err != nil {
142
// if mf.leader != mf.id {
143
// We don't use above statement to avoid "leader double-change of keystore cache"
144
// Because there may a race condition: before "Apply" raftlog, leader change happens
145
// so that cache changes may not happen in newly selected leader-node and "double-change"
146
// of cache may happen in newly demoted leader node. Therefore, we use the following
147
// statement: "id" indicates which server has changed keystore cache (typical leader).
149
mf.DeleteKey(keyInfo.ID)
150
mf.DeleteAKInfo(keyInfo.AccessKey)
151
log.LogInfof("action[Apply], Successfully delete key in node[%d]", mf.id)
153
log.LogInfof("action[Apply], Already delete key in node[%d]", mf.id)
156
if err = mf.batchPut(cmdMap); err != nil {
159
// if mf.leader != mf.id {
160
// Same reasons as the description above
163
accessKeyInfo := &keystore.AccessKeyInfo{
164
AccessKey: keyInfo.AccessKey,
167
mf.PutAKInfo(accessKeyInfo)
168
log.LogInfof("action[Apply], Successfully put key in node[%d]", mf.id)
170
log.LogInfof("action[Apply], Already put key in node[%d]", mf.id)
174
if mf.applied > 0 && (mf.applied%mf.retainLogs) == 0 {
175
log.LogWarnf("action[Apply],truncate raft log,retainLogs[%v],index[%v]", mf.retainLogs, mf.applied)
176
mf.rs.Truncate(GroupID, mf.applied)
181
// ApplyMemberChange implements the interface of raft.StateMachine
182
func (mf *KeystoreFsm) ApplyMemberChange(confChange *proto.ConfChange, index uint64) (interface{}, error) {
184
if mf.peerChangeHandler != nil {
185
err = mf.peerChangeHandler(confChange)
190
// Snapshot implements the interface of raft.StateMachine
191
func (mf *KeystoreFsm) Snapshot() (proto.Snapshot, error) {
192
snapshot := mf.store.RocksDBSnapshot()
193
iterator := mf.store.Iterator(snapshot)
194
iterator.SeekToFirst()
195
return &KeystoreSnapshot{
203
// ApplySnapshot implements the interface of raft.StateMachine
204
func (mf *KeystoreFsm) ApplySnapshot(peers []proto.Peer, iterator proto.SnapIterator) (err error) {
205
log.LogInfof(fmt.Sprintf("action[ApplySnapshot] begin,applied[%v]", mf.applied))
208
if data, err = iterator.Next(); err != nil {
212
if err = json.Unmarshal(data, cmd); err != nil {
215
if _, err = mf.store.Put(cmd.K, cmd.V, true); err != nil {
219
if err != nil && err != io.EOF {
223
log.LogInfof(fmt.Sprintf("action[ApplySnapshot] success,applied[%v]", mf.applied))
226
log.LogError(fmt.Sprintf("action[ApplySnapshot] failed,err:%v", err.Error()))
230
// HandleFatalEvent implements the interface of raft.StateMachine
231
func (mf *KeystoreFsm) HandleFatalEvent(err *raft.FatalError) {
235
// HandleLeaderChange implements the interface of raft.StateMachine
236
func (mf *KeystoreFsm) HandleLeaderChange(leader uint64) {
237
if mf.leaderChangeHandler != nil {
238
go mf.leaderChangeHandler(leader)
242
// Put implements the interface of raft.StateMachine
243
func (mf *KeystoreFsm) Put(key, val interface{}) (interface{}, error) {
244
return mf.store.Put(key, val, true)
247
func (mf *KeystoreFsm) batchPut(cmdMap map[string][]byte) (err error) {
248
return mf.store.BatchPut(cmdMap, true)
251
// Get implements the interface of raft.StateMachine
252
func (mf *KeystoreFsm) Get(key interface{}) (interface{}, error) {
253
return mf.store.Get(key)
256
// Del implements the interface of raft.StateMachine
257
func (mf *KeystoreFsm) Del(key interface{}) (interface{}, error) {
258
return mf.store.Del(key, true)
261
func (mf *KeystoreFsm) delKeyAndPutIndex(key string, cmdMap map[string][]byte) (err error) {
262
return mf.store.DeleteKeyAndPutIndex(key, cmdMap, true)