1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
24
"github.com/cubefs/cubefs/depends/tiglabs/raft/proto"
25
"github.com/cubefs/cubefs/util/keystore"
26
"github.com/cubefs/cubefs/util/log"
29
// RaftCmd defines the Raft commands.
36
// Marshal converts the RaftCmd to a byte array.
37
func (m *RaftCmd) Marshal() ([]byte, error) {
38
return json.Marshal(m)
41
// Unmarshal converts the byte array to a RaftCmd.
42
func (m *RaftCmd) Unmarshal(data []byte) (err error) {
43
return json.Unmarshal(data, m)
46
func (m *RaftCmd) setOpType() {
47
keyArr := strings.Split(m.K, keySeparator)
49
log.LogWarnf("action[setOpType] invalid length[%v]", keyArr)
58
log.LogWarnf("action[setOpType] unknown opCode[%v]", keyArr[1])
62
func (c *Cluster) submit(metadata *RaftCmd) (err error) {
63
cmd, err := metadata.Marshal()
65
return errors.New(err.Error())
67
if _, err = c.partition.Submit(cmd); err != nil {
68
msg := fmt.Sprintf("action[keystore_submit] err:%v", err.Error())
69
return errors.New(msg)
74
func (c *Cluster) syncAddKey(keyInfo *keystore.KeyInfo) (err error) {
75
return c.syncPutKeyInfo(opSyncAddKey, keyInfo)
78
func (c *Cluster) syncAddAccessKey(akInfo *keystore.AccessKeyInfo) (err error) {
79
return c.syncPutAccessKeyInfo(opSyncAddKey, akInfo)
82
func (c *Cluster) syncAddCaps(keyInfo *keystore.KeyInfo) (err error) {
83
return c.syncPutKeyInfo(opSyncAddCaps, keyInfo)
86
func (c *Cluster) syncDeleteKey(keyInfo *keystore.KeyInfo) (err error) {
87
return c.syncPutKeyInfo(opSyncDeleteKey, keyInfo)
90
func (c *Cluster) syncDeleteAccessKey(akInfo *keystore.AccessKeyInfo) (err error) {
91
return c.syncPutAccessKeyInfo(opSyncDeleteKey, akInfo)
94
func (c *Cluster) syncDeleteCaps(keyInfo *keystore.KeyInfo) (err error) {
95
return c.syncPutKeyInfo(opSyncDeleteCaps, keyInfo)
98
func (c *Cluster) syncPutKeyInfo(opType uint32, keyInfo *keystore.KeyInfo) (err error) {
99
keydata := new(RaftCmd)
101
keydata.K = ksPrefix + keyInfo.ID + idSeparator + strconv.FormatUint(c.fsm.id, 10)
103
if keydata.V, err = json.Marshal(vv); err != nil {
104
return errors.New(err.Error())
106
return c.submit(keydata)
109
func (c *Cluster) syncPutAccessKeyInfo(opType uint32, accessKeyInfo *keystore.AccessKeyInfo) (err error) {
110
keydata := new(RaftCmd)
112
keydata.K = akPrefix + accessKeyInfo.AccessKey + idSeparator + strconv.FormatUint(c.fsm.id, 10)
114
if keydata.V, err = json.Marshal(vv); err != nil {
115
return errors.New(err.Error())
117
return c.submit(keydata)
120
func (c *Cluster) loadKeystore() (err error) {
121
ks := make(map[string]*keystore.KeyInfo, 0)
122
log.LogInfof("action[loadKeystore]")
123
result, err := c.fsm.store.SeekForPrefix([]byte(ksPrefix))
125
err = fmt.Errorf("action[loadKeystore],err:%v", err.Error())
128
for _, value := range result {
129
k := &keystore.KeyInfo{}
130
if err = json.Unmarshal(value, k); err != nil {
131
err = fmt.Errorf("action[loadKeystore],value:%v,unmarshal err:%v", string(value), err)
134
if _, ok := ks[k.ID]; !ok {
137
log.LogInfof("action[loadKeystore],key[%v]", k)
140
defer c.fsm.ksMutex.Unlock()
146
func (c *Cluster) clearKeystore() {
148
defer c.fsm.ksMutex.Unlock()
152
func (c *Cluster) loadAKstore() (err error) {
153
aks := make(map[string]*keystore.AccessKeyInfo, 0)
154
log.LogInfof("action[loadAccessKeystore]")
155
result, err := c.fsm.store.SeekForPrefix([]byte(akPrefix))
157
err = fmt.Errorf("action[loadAccessKeystore], err: %v", err.Error())
160
for _, value := range result {
161
ak := &keystore.AccessKeyInfo{}
162
if err = json.Unmarshal(value, ak); err != nil {
163
err = fmt.Errorf("action[loadAccessKeystore], value: %v, unmarshal err: %v", string(value), err)
166
if _, ok := aks[ak.AccessKey]; !ok {
167
aks[ak.AccessKey] = ak
169
log.LogInfof("action[loadAccessKeystore], access key[%v]", ak)
171
c.fsm.aksMutex.Lock()
172
defer c.fsm.aksMutex.Unlock()
173
c.fsm.accessKeystore = aks
178
func (c *Cluster) clearAKstore() {
179
c.fsm.aksMutex.Lock()
180
defer c.fsm.aksMutex.Unlock()
181
c.fsm.accessKeystore = nil
184
func (c *Cluster) addRaftNode(nodeID uint64, addr string) (err error) {
185
peer := proto.Peer{ID: nodeID}
186
_, err = c.partition.ChangeMember(proto.ConfAddNode, peer, []byte(addr))
188
return errors.New("action[addRaftNode] error: " + err.Error())
193
func (c *Cluster) removeRaftNode(nodeID uint64, addr string) (err error) {
194
peer := proto.Peer{ID: nodeID}
195
_, err = c.partition.ChangeMember(proto.ConfRemoveNode, peer, []byte(addr))
197
return errors.New("action[removeRaftNode] error: " + err.Error())