cubefs

Форк
0
/
keystore_fsm.go 
263 строки · 7.9 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package authnode
16

17
import (
18
	"encoding/json"
19
	"fmt"
20
	"io"
21
	"strconv"
22
	"strings"
23
	"sync"
24

25
	"github.com/cubefs/cubefs/depends/tiglabs/raft"
26
	"github.com/cubefs/cubefs/depends/tiglabs/raft/proto"
27
	"github.com/cubefs/cubefs/raftstore/raftstore_db"
28
	"github.com/cubefs/cubefs/util/keystore"
29
	"github.com/cubefs/cubefs/util/log"
30
)
31

32
const (
33
	applied = "applied"
34
)
35

36
type raftLeaderChangeHandler func(leader uint64)
37

38
type raftPeerChangeHandler func(confChange *proto.ConfChange) (err error)
39

40
type raftCmdApplyHandler func(cmd *RaftCmd) (err error)
41

42
type raftApplySnapshotHandler func()
43

44
// KeystoreFsm represents the finite state machine of a keystore
45
type KeystoreFsm struct {
46
	store               *raftstore_db.RocksDBStore
47
	rs                  *raft.RaftServer
48
	applied             uint64
49
	retainLogs          uint64
50
	leaderChangeHandler raftLeaderChangeHandler
51
	peerChangeHandler   raftPeerChangeHandler
52
	snapshotHandler     raftApplySnapshotHandler
53

54
	keystore       map[string]*keystore.KeyInfo
55
	accessKeystore map[string]*keystore.AccessKeyInfo
56
	ksMutex        sync.RWMutex // keystore mutex
57
	aksMutex       sync.RWMutex // accesskeystore mutex
58
	opKeyMutex     sync.RWMutex // operations on key mutex
59
	id             uint64       // current id of server
60
}
61

62
func newKeystoreFsm(store *raftstore_db.RocksDBStore, retainsLog uint64, rs *raft.RaftServer) (fsm *KeystoreFsm) {
63
	fsm = new(KeystoreFsm)
64
	fsm.store = store
65
	fsm.rs = rs
66
	fsm.retainLogs = retainsLog
67
	return
68
}
69

70
// Corresponding to the LeaderChange interface in Raft library.
71
func (mf *KeystoreFsm) registerLeaderChangeHandler(handler raftLeaderChangeHandler) {
72
	mf.leaderChangeHandler = handler
73
}
74

75
// Corresponding to the PeerChange interface in Raft library.
76
func (mf *KeystoreFsm) registerPeerChangeHandler(handler raftPeerChangeHandler) {
77
	mf.peerChangeHandler = handler
78
}
79

80
// Corresponding to the ApplySnapshot interface in Raft library.
81
func (mf *KeystoreFsm) registerApplySnapshotHandler(handler raftApplySnapshotHandler) {
82
	mf.snapshotHandler = handler
83
}
84

85
func (mf *KeystoreFsm) restore() {
86
	mf.restoreApplied()
87
}
88

89
func (mf *KeystoreFsm) restoreApplied() {
90
	value, err := mf.Get(applied)
91
	if err != nil {
92
		panic(fmt.Sprintf("Failed to restore applied err:%v", err.Error()))
93
	}
94
	byteValues := value.([]byte)
95
	if len(byteValues) == 0 {
96
		mf.applied = 0
97
		return
98
	}
99
	applied, err := strconv.ParseUint(string(byteValues), 10, 64)
100
	if err != nil {
101
		panic(fmt.Sprintf("Failed to restore applied,err:%v ", err.Error()))
102
	}
103
	mf.applied = applied
104
}
105

106
// Apply implements the interface of raft.StateMachine
107
func (mf *KeystoreFsm) Apply(command []byte, index uint64) (resp interface{}, err error) {
108
	var (
109
		keyInfo keystore.KeyInfo
110
		leader  uint64
111
	)
112

113
	cmd := new(RaftCmd)
114
	if err = cmd.Unmarshal(command); err != nil {
115
		log.LogErrorf("action[fsmApply],unmarshal data:%v, err:%v", command, err.Error())
116
		panic(err)
117
	}
118
	log.LogInfof("action[fsmApply],cmd.op[%v],cmd.K[%v],cmd.V[%v]", cmd.Op, cmd.K, string(cmd.V))
119

120
	if err = json.Unmarshal(cmd.V, &keyInfo); err != nil {
121
		panic(err)
122
	}
123

124
	s := strings.Split(cmd.K, idSeparator)
125
	if len(s) != 2 {
126
		panic(fmt.Errorf("cmd.K format error %s", cmd.K))
127
	}
128
	leader, err = strconv.ParseUint(s[1], 10, 64)
129
	if err != nil {
130
		panic(fmt.Errorf("leaderID format error %s", s[1]))
131
	}
132

133
	cmdMap := make(map[string][]byte)
134
	cmdMap[s[0]] = cmd.V
135
	cmdMap[applied] = []byte(strconv.FormatUint(uint64(index), 10))
136

137
	switch cmd.Op {
138
	case opSyncDeleteKey:
139
		if err = mf.delKeyAndPutIndex(cmd.K, cmdMap); err != nil {
140
			panic(err)
141
		}
142
		// if mf.leader != mf.id {
143
		// We don't use above statement to avoid "leader double-change of keystore cache"
144
		// Because there may a race condition: before "Apply" raftlog, leader change happens
145
		// so that cache changes may not happen in newly selected leader-node and "double-change"
146
		// of cache may happen in newly demoted leader node. Therefore, we use the following
147
		// statement: "id" indicates which server has changed keystore cache (typical leader).
148
		if mf.id != leader {
149
			mf.DeleteKey(keyInfo.ID)
150
			mf.DeleteAKInfo(keyInfo.AccessKey)
151
			log.LogInfof("action[Apply], Successfully delete key in node[%d]", mf.id)
152
		} else {
153
			log.LogInfof("action[Apply], Already delete key in node[%d]", mf.id)
154
		}
155
	default:
156
		if err = mf.batchPut(cmdMap); err != nil {
157
			panic(err)
158
		}
159
		// if mf.leader != mf.id {
160
		// Same reasons as the description above
161
		if mf.id != leader {
162
			mf.PutKey(&keyInfo)
163
			accessKeyInfo := &keystore.AccessKeyInfo{
164
				AccessKey: keyInfo.AccessKey,
165
				ID:        keyInfo.ID,
166
			}
167
			mf.PutAKInfo(accessKeyInfo)
168
			log.LogInfof("action[Apply], Successfully put key in node[%d]", mf.id)
169
		} else {
170
			log.LogInfof("action[Apply], Already put key in node[%d]", mf.id)
171
		}
172
	}
173
	mf.applied = index
174
	if mf.applied > 0 && (mf.applied%mf.retainLogs) == 0 {
175
		log.LogWarnf("action[Apply],truncate raft log,retainLogs[%v],index[%v]", mf.retainLogs, mf.applied)
176
		mf.rs.Truncate(GroupID, mf.applied)
177
	}
178
	return
179
}
180

181
// ApplyMemberChange implements the interface of raft.StateMachine
182
func (mf *KeystoreFsm) ApplyMemberChange(confChange *proto.ConfChange, index uint64) (interface{}, error) {
183
	var err error
184
	if mf.peerChangeHandler != nil {
185
		err = mf.peerChangeHandler(confChange)
186
	}
187
	return nil, err
188
}
189

190
// Snapshot implements the interface of raft.StateMachine
191
func (mf *KeystoreFsm) Snapshot() (proto.Snapshot, error) {
192
	snapshot := mf.store.RocksDBSnapshot()
193
	iterator := mf.store.Iterator(snapshot)
194
	iterator.SeekToFirst()
195
	return &KeystoreSnapshot{
196
		applied:  mf.applied,
197
		snapshot: snapshot,
198
		fsm:      mf,
199
		iterator: iterator,
200
	}, nil
201
}
202

203
// ApplySnapshot implements the interface of raft.StateMachine
204
func (mf *KeystoreFsm) ApplySnapshot(peers []proto.Peer, iterator proto.SnapIterator) (err error) {
205
	log.LogInfof(fmt.Sprintf("action[ApplySnapshot] begin,applied[%v]", mf.applied))
206
	var data []byte
207
	for err == nil {
208
		if data, err = iterator.Next(); err != nil {
209
			break
210
		}
211
		cmd := &RaftCmd{}
212
		if err = json.Unmarshal(data, cmd); err != nil {
213
			goto errHandler
214
		}
215
		if _, err = mf.store.Put(cmd.K, cmd.V, true); err != nil {
216
			goto errHandler
217
		}
218
	}
219
	if err != nil && err != io.EOF {
220
		goto errHandler
221
	}
222
	mf.snapshotHandler()
223
	log.LogInfof(fmt.Sprintf("action[ApplySnapshot] success,applied[%v]", mf.applied))
224
	return nil
225
errHandler:
226
	log.LogError(fmt.Sprintf("action[ApplySnapshot] failed,err:%v", err.Error()))
227
	return err
228
}
229

230
// HandleFatalEvent implements the interface of raft.StateMachine
231
func (mf *KeystoreFsm) HandleFatalEvent(err *raft.FatalError) {
232
	panic(err.Err)
233
}
234

235
// HandleLeaderChange implements the interface of raft.StateMachine
236
func (mf *KeystoreFsm) HandleLeaderChange(leader uint64) {
237
	if mf.leaderChangeHandler != nil {
238
		go mf.leaderChangeHandler(leader)
239
	}
240
}
241

242
// Put implements the interface of raft.StateMachine
243
func (mf *KeystoreFsm) Put(key, val interface{}) (interface{}, error) {
244
	return mf.store.Put(key, val, true)
245
}
246

247
func (mf *KeystoreFsm) batchPut(cmdMap map[string][]byte) (err error) {
248
	return mf.store.BatchPut(cmdMap, true)
249
}
250

251
// Get implements the interface of raft.StateMachine
252
func (mf *KeystoreFsm) Get(key interface{}) (interface{}, error) {
253
	return mf.store.Get(key)
254
}
255

256
// Del implements the interface of raft.StateMachine
257
func (mf *KeystoreFsm) Del(key interface{}) (interface{}, error) {
258
	return mf.store.Del(key, true)
259
}
260

261
func (mf *KeystoreFsm) delKeyAndPutIndex(key string, cmdMap map[string][]byte) (err error) {
262
	return mf.store.DeleteKeyAndPutIndex(key, cmdMap, true)
263
}
264

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.