cubefs

Форк
0
/
keystore_fsm_op.go 
200 строк · 5.5 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package authnode
16

17
import (
18
	"encoding/json"
19
	"errors"
20
	"fmt"
21
	"strconv"
22
	"strings"
23

24
	"github.com/cubefs/cubefs/depends/tiglabs/raft/proto"
25
	"github.com/cubefs/cubefs/util/keystore"
26
	"github.com/cubefs/cubefs/util/log"
27
)
28

29
// RaftCmd defines the Raft commands.
30
type RaftCmd struct {
31
	Op uint32 `json:"op"`
32
	K  string `json:"k"`
33
	V  []byte `json:"v"`
34
}
35

36
// Marshal converts the RaftCmd to a byte array.
37
func (m *RaftCmd) Marshal() ([]byte, error) {
38
	return json.Marshal(m)
39
}
40

41
// Unmarshal converts the byte array to a RaftCmd.
42
func (m *RaftCmd) Unmarshal(data []byte) (err error) {
43
	return json.Unmarshal(data, m)
44
}
45

46
func (m *RaftCmd) setOpType() {
47
	keyArr := strings.Split(m.K, keySeparator)
48
	if len(keyArr) < 2 {
49
		log.LogWarnf("action[setOpType] invalid length[%v]", keyArr)
50
		return
51
	}
52
	switch keyArr[1] {
53
	case keyAcronym:
54
		m.Op = opSyncAddKey
55
	case akAcronym:
56
		m.Op = opSyncAddKey
57
	default:
58
		log.LogWarnf("action[setOpType] unknown opCode[%v]", keyArr[1])
59
	}
60
}
61

62
func (c *Cluster) submit(metadata *RaftCmd) (err error) {
63
	cmd, err := metadata.Marshal()
64
	if err != nil {
65
		return errors.New(err.Error())
66
	}
67
	if _, err = c.partition.Submit(cmd); err != nil {
68
		msg := fmt.Sprintf("action[keystore_submit] err:%v", err.Error())
69
		return errors.New(msg)
70
	}
71
	return
72
}
73

74
func (c *Cluster) syncAddKey(keyInfo *keystore.KeyInfo) (err error) {
75
	return c.syncPutKeyInfo(opSyncAddKey, keyInfo)
76
}
77

78
func (c *Cluster) syncAddAccessKey(akInfo *keystore.AccessKeyInfo) (err error) {
79
	return c.syncPutAccessKeyInfo(opSyncAddKey, akInfo)
80
}
81

82
func (c *Cluster) syncAddCaps(keyInfo *keystore.KeyInfo) (err error) {
83
	return c.syncPutKeyInfo(opSyncAddCaps, keyInfo)
84
}
85

86
func (c *Cluster) syncDeleteKey(keyInfo *keystore.KeyInfo) (err error) {
87
	return c.syncPutKeyInfo(opSyncDeleteKey, keyInfo)
88
}
89

90
func (c *Cluster) syncDeleteAccessKey(akInfo *keystore.AccessKeyInfo) (err error) {
91
	return c.syncPutAccessKeyInfo(opSyncDeleteKey, akInfo)
92
}
93

94
func (c *Cluster) syncDeleteCaps(keyInfo *keystore.KeyInfo) (err error) {
95
	return c.syncPutKeyInfo(opSyncDeleteCaps, keyInfo)
96
}
97

98
func (c *Cluster) syncPutKeyInfo(opType uint32, keyInfo *keystore.KeyInfo) (err error) {
99
	keydata := new(RaftCmd)
100
	keydata.Op = opType
101
	keydata.K = ksPrefix + keyInfo.ID + idSeparator + strconv.FormatUint(c.fsm.id, 10)
102
	vv := *keyInfo
103
	if keydata.V, err = json.Marshal(vv); err != nil {
104
		return errors.New(err.Error())
105
	}
106
	return c.submit(keydata)
107
}
108

109
func (c *Cluster) syncPutAccessKeyInfo(opType uint32, accessKeyInfo *keystore.AccessKeyInfo) (err error) {
110
	keydata := new(RaftCmd)
111
	keydata.Op = opType
112
	keydata.K = akPrefix + accessKeyInfo.AccessKey + idSeparator + strconv.FormatUint(c.fsm.id, 10)
113
	vv := *accessKeyInfo
114
	if keydata.V, err = json.Marshal(vv); err != nil {
115
		return errors.New(err.Error())
116
	}
117
	return c.submit(keydata)
118
}
119

120
func (c *Cluster) loadKeystore() (err error) {
121
	ks := make(map[string]*keystore.KeyInfo, 0)
122
	log.LogInfof("action[loadKeystore]")
123
	result, err := c.fsm.store.SeekForPrefix([]byte(ksPrefix))
124
	if err != nil {
125
		err = fmt.Errorf("action[loadKeystore],err:%v", err.Error())
126
		return err
127
	}
128
	for _, value := range result {
129
		k := &keystore.KeyInfo{}
130
		if err = json.Unmarshal(value, k); err != nil {
131
			err = fmt.Errorf("action[loadKeystore],value:%v,unmarshal err:%v", string(value), err)
132
			return err
133
		}
134
		if _, ok := ks[k.ID]; !ok {
135
			ks[k.ID] = k
136
		}
137
		log.LogInfof("action[loadKeystore],key[%v]", k)
138
	}
139
	c.fsm.ksMutex.Lock()
140
	defer c.fsm.ksMutex.Unlock()
141
	c.fsm.keystore = ks
142

143
	return
144
}
145

146
func (c *Cluster) clearKeystore() {
147
	c.fsm.ksMutex.Lock()
148
	defer c.fsm.ksMutex.Unlock()
149
	c.fsm.keystore = nil
150
}
151

152
func (c *Cluster) loadAKstore() (err error) {
153
	aks := make(map[string]*keystore.AccessKeyInfo, 0)
154
	log.LogInfof("action[loadAccessKeystore]")
155
	result, err := c.fsm.store.SeekForPrefix([]byte(akPrefix))
156
	if err != nil {
157
		err = fmt.Errorf("action[loadAccessKeystore], err: %v", err.Error())
158
		return err
159
	}
160
	for _, value := range result {
161
		ak := &keystore.AccessKeyInfo{}
162
		if err = json.Unmarshal(value, ak); err != nil {
163
			err = fmt.Errorf("action[loadAccessKeystore], value: %v, unmarshal err: %v", string(value), err)
164
			return err
165
		}
166
		if _, ok := aks[ak.AccessKey]; !ok {
167
			aks[ak.AccessKey] = ak
168
		}
169
		log.LogInfof("action[loadAccessKeystore], access key[%v]", ak)
170
	}
171
	c.fsm.aksMutex.Lock()
172
	defer c.fsm.aksMutex.Unlock()
173
	c.fsm.accessKeystore = aks
174

175
	return
176
}
177

178
func (c *Cluster) clearAKstore() {
179
	c.fsm.aksMutex.Lock()
180
	defer c.fsm.aksMutex.Unlock()
181
	c.fsm.accessKeystore = nil
182
}
183

184
func (c *Cluster) addRaftNode(nodeID uint64, addr string) (err error) {
185
	peer := proto.Peer{ID: nodeID}
186
	_, err = c.partition.ChangeMember(proto.ConfAddNode, peer, []byte(addr))
187
	if err != nil {
188
		return errors.New("action[addRaftNode] error: " + err.Error())
189
	}
190
	return nil
191
}
192

193
func (c *Cluster) removeRaftNode(nodeID uint64, addr string) (err error) {
194
	peer := proto.Peer{ID: nodeID}
195
	_, err = c.partition.ChangeMember(proto.ConfRemoveNode, peer, []byte(addr))
196
	if err != nil {
197
		return errors.New("action[removeRaftNode] error: " + err.Error())
198
	}
199
	return nil
200
}
201

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.