1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
26
"github.com/cubefs/cubefs/proto"
27
"github.com/cubefs/cubefs/raftstore"
28
"github.com/cubefs/cubefs/raftstore/raftstore_db"
29
"github.com/cubefs/cubefs/util"
30
"github.com/cubefs/cubefs/util/config"
31
"github.com/cubefs/cubefs/util/cryptoutil"
32
"github.com/cubefs/cubefs/util/errors"
33
"github.com/cubefs/cubefs/util/log"
37
LRUCacheSize = 3 << 30
38
WriteBufferSize = 4 * util.MB
41
// AuthProxy wraps the stuff for http and https
42
type AuthProxy struct {
44
reverseProxy *httputil.ReverseProxy
49
// Server represents the server in a cluster
60
leaderInfo *LeaderInfo
63
rocksDBStore *raftstore_db.RocksDBStore
64
raftStore raftstore.RaftStore
66
partition raftstore.Partition
74
ClusterName = "clusterName"
82
ModuleName = "authnode"
83
CfgRetainLogs = "retainLogs"
84
DefaultRetainLogs = 20000
85
cfgTickInterval = "tickInterval"
86
cfgElectionTick = "electionTick"
87
AuthSecretKey = "authServiceKey"
88
AuthRootKey = "authRootKey"
89
EnableHTTPS = "enableHTTPS"
92
// NewServer creates a new server
93
func NewServer() *Server {
97
func (m *Server) checkConfig(cfg *config.Config) (err error) {
98
m.clusterName = cfg.GetString(ClusterName)
99
m.ip = cfg.GetString(IP)
100
m.port = cfg.GetString(Port)
101
m.walDir = cfg.GetString(WalDir)
102
m.storeDir = cfg.GetString(StoreDir)
104
peerAddrs := cfg.GetString(cfgPeers)
105
if m.ip == "" || m.port == "" || m.walDir == "" || m.storeDir == "" || m.clusterName == "" || peerAddrs == "" {
106
return fmt.Errorf("%v,err:%v", proto.ErrInvalidCfg, "one of (ip,port,walDir,storeDir,clusterName) is null")
108
if m.id, err = strconv.ParseUint(cfg.GetString(ID), 10, 64); err != nil {
109
return fmt.Errorf("%v,err:%v", proto.ErrInvalidCfg, err.Error())
111
m.config.heartbeatPort = cfg.GetInt64(heartbeatPortKey)
112
m.config.replicaPort = cfg.GetInt64(replicaPortKey)
113
if m.config.heartbeatPort <= 1024 {
114
m.config.heartbeatPort = raftstore.DefaultHeartbeatPort
116
if m.config.replicaPort <= 1024 {
117
m.config.replicaPort = raftstore.DefaultReplicaPort
119
syslog.Printf("heartbeatPort[%v],replicaPort[%v]\n", m.config.heartbeatPort, m.config.replicaPort)
121
if err = m.config.parsePeers(peerAddrs); err != nil {
124
retainLogs := cfg.GetString(CfgRetainLogs)
125
if retainLogs != "" {
126
if m.retainLogs, err = strconv.ParseUint(retainLogs, 10, 64); err != nil {
127
return fmt.Errorf("%v,err:%v", proto.ErrInvalidCfg, err.Error())
130
if m.retainLogs <= 0 {
131
m.retainLogs = DefaultRetainLogs
133
syslog.Println("retainLogs=", m.retainLogs)
135
m.tickInterval = int(cfg.GetFloat(cfgTickInterval))
136
m.electionTick = int(cfg.GetFloat(cfgElectionTick))
137
if m.tickInterval <= 300 {
140
if m.electionTick <= 3 {
147
func (m *Server) initFsm() {
148
m.fsm = newKeystoreFsm(m.rocksDBStore, m.retainLogs, m.raftStore.RaftServer())
149
m.fsm.registerLeaderChangeHandler(m.handleLeaderChange)
150
m.fsm.registerPeerChangeHandler(m.handlePeerChange)
154
// register the handlers for the interfaces defined in the Raft library
155
m.fsm.registerApplySnapshotHandler(m.handleApplySnapshot)
159
func (m *Server) createRaftServer(cfg *config.Config) (err error) {
160
raftCfg := &raftstore.Config{
163
NumOfLogsToRetain: m.retainLogs,
164
HeartbeatPort: int(m.config.heartbeatPort),
165
ReplicaPort: int(m.config.replicaPort),
166
TickInterval: m.tickInterval,
167
ElectionTick: m.electionTick,
169
if m.raftStore, err = raftstore.NewRaftStore(raftCfg, cfg); err != nil {
170
return errors.Trace(err, "NewRaftStore failed! id[%v] walPath[%v]", m.id, m.walDir)
173
partitionCfg := &raftstore.PartitionConfig{
175
Peers: m.config.peers,
176
Applied: m.fsm.applied,
179
if m.partition, err = m.raftStore.CreatePartition(partitionCfg); err != nil {
180
return errors.Trace(err, "CreatePartition failed")
185
// Start starts a server
186
func (m *Server) Start(cfg *config.Config) (err error) {
187
m.config = newClusterConfig()
188
m.leaderInfo = &LeaderInfo{}
189
if err = m.checkConfig(cfg); err != nil {
190
log.LogError(errors.Stack(err))
193
if m.rocksDBStore, err = raftstore_db.NewRocksDBStore(m.storeDir, LRUCacheSize, WriteBufferSize); err != nil {
194
log.LogErrorf("Start: init RocksDB fail: err(%v)", err)
198
if err = m.createRaftServer(cfg); err != nil {
199
log.LogError(errors.Stack(err))
203
m.cluster.partition = m.partition
205
AuthSecretKey := cfg.GetString(AuthSecretKey)
206
if m.cluster.AuthSecretKey, err = cryptoutil.Base64Decode(AuthSecretKey); err != nil {
207
return fmt.Errorf("action[Start] failed %v,err: auth service Key invalid=%s", proto.ErrInvalidCfg, AuthSecretKey)
210
AuthRootKey := cfg.GetString(AuthRootKey)
211
if m.cluster.AuthRootKey, err = cryptoutil.Base64Decode(AuthRootKey); err != nil {
212
return fmt.Errorf("action[Start] failed %v,err: auth root Key invalid=%s", proto.ErrInvalidCfg, AuthRootKey)
215
if cfg.GetBool(EnableHTTPS) == true {
216
m.cluster.PKIKey.EnableHTTPS = true
217
if m.cluster.PKIKey.AuthRootPublicKey, err = os.ReadFile("/app/server.crt"); err != nil {
218
return fmt.Errorf("action[Start] failed,err[%v]", err)
220
if m.cluster.PKIKey.AuthRootPrivateKey, err = os.ReadFile("/app/server.key"); err != nil {
221
return fmt.Errorf("action[Start] failed,err[%v]", err)
225
m.cluster.PKIKey.EnableHTTPS = false
227
m.authProxy = m.newAuthProxy()
229
m.cluster.scheduleTask()
235
// Shutdown closes the server
236
func (m *Server) Shutdown() {
240
// Sync waits for the execution termination of the server
241
func (m *Server) Sync() {
245
func (m *Server) initCluster() {
246
m.cluster = newCluster(m.clusterName, m.leaderInfo, m.fsm, m.partition, m.config)
247
m.cluster.retainLogs = m.retainLogs