cubefs

Форк
0
/
server.go 
248 строк · 7.1 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package authnode
16

17
import (
18
	"fmt"
19
	syslog "log"
20
	"net/http"
21
	"net/http/httputil"
22
	"os"
23
	"strconv"
24
	"sync"
25

26
	"github.com/cubefs/cubefs/proto"
27
	"github.com/cubefs/cubefs/raftstore"
28
	"github.com/cubefs/cubefs/raftstore/raftstore_db"
29
	"github.com/cubefs/cubefs/util"
30
	"github.com/cubefs/cubefs/util/config"
31
	"github.com/cubefs/cubefs/util/cryptoutil"
32
	"github.com/cubefs/cubefs/util/errors"
33
	"github.com/cubefs/cubefs/util/log"
34
)
35

36
const (
37
	LRUCacheSize    = 3 << 30
38
	WriteBufferSize = 4 * util.MB
39
)
40

41
// AuthProxy wraps the stuff for http and https
42
type AuthProxy struct {
43
	// for http proxy
44
	reverseProxy *httputil.ReverseProxy
45
	// for https redirect
46
	client *http.Client
47
}
48

49
// Server represents the server in a cluster
50
type Server struct {
51
	id           uint64
52
	clusterName  string
53
	ip           string
54
	port         string
55
	walDir       string
56
	storeDir     string
57
	retainLogs   uint64
58
	tickInterval int
59
	electionTick int
60
	leaderInfo   *LeaderInfo
61
	config       *clusterConfig
62
	cluster      *Cluster
63
	rocksDBStore *raftstore_db.RocksDBStore
64
	raftStore    raftstore.RaftStore
65
	fsm          *KeystoreFsm
66
	partition    raftstore.Partition
67
	wg           sync.WaitGroup
68
	authProxy    *AuthProxy
69
	metaReady    bool
70
}
71

72
// configuration keys
73
const (
74
	ClusterName       = "clusterName"
75
	ID                = "id"
76
	IP                = "ip"
77
	Port              = "port"
78
	LogLevel          = "logLevel"
79
	WalDir            = "walDir"
80
	StoreDir          = "storeDir"
81
	GroupID           = 1
82
	ModuleName        = "authnode"
83
	CfgRetainLogs     = "retainLogs"
84
	DefaultRetainLogs = 20000
85
	cfgTickInterval   = "tickInterval"
86
	cfgElectionTick   = "electionTick"
87
	AuthSecretKey     = "authServiceKey"
88
	AuthRootKey       = "authRootKey"
89
	EnableHTTPS       = "enableHTTPS"
90
)
91

92
// NewServer creates a new server
93
func NewServer() *Server {
94
	return &Server{}
95
}
96

97
func (m *Server) checkConfig(cfg *config.Config) (err error) {
98
	m.clusterName = cfg.GetString(ClusterName)
99
	m.ip = cfg.GetString(IP)
100
	m.port = cfg.GetString(Port)
101
	m.walDir = cfg.GetString(WalDir)
102
	m.storeDir = cfg.GetString(StoreDir)
103

104
	peerAddrs := cfg.GetString(cfgPeers)
105
	if m.ip == "" || m.port == "" || m.walDir == "" || m.storeDir == "" || m.clusterName == "" || peerAddrs == "" {
106
		return fmt.Errorf("%v,err:%v", proto.ErrInvalidCfg, "one of (ip,port,walDir,storeDir,clusterName) is null")
107
	}
108
	if m.id, err = strconv.ParseUint(cfg.GetString(ID), 10, 64); err != nil {
109
		return fmt.Errorf("%v,err:%v", proto.ErrInvalidCfg, err.Error())
110
	}
111
	m.config.heartbeatPort = cfg.GetInt64(heartbeatPortKey)
112
	m.config.replicaPort = cfg.GetInt64(replicaPortKey)
113
	if m.config.heartbeatPort <= 1024 {
114
		m.config.heartbeatPort = raftstore.DefaultHeartbeatPort
115
	}
116
	if m.config.replicaPort <= 1024 {
117
		m.config.replicaPort = raftstore.DefaultReplicaPort
118
	}
119
	syslog.Printf("heartbeatPort[%v],replicaPort[%v]\n", m.config.heartbeatPort, m.config.replicaPort)
120

121
	if err = m.config.parsePeers(peerAddrs); err != nil {
122
		return
123
	}
124
	retainLogs := cfg.GetString(CfgRetainLogs)
125
	if retainLogs != "" {
126
		if m.retainLogs, err = strconv.ParseUint(retainLogs, 10, 64); err != nil {
127
			return fmt.Errorf("%v,err:%v", proto.ErrInvalidCfg, err.Error())
128
		}
129
	}
130
	if m.retainLogs <= 0 {
131
		m.retainLogs = DefaultRetainLogs
132
	}
133
	syslog.Println("retainLogs=", m.retainLogs)
134

135
	m.tickInterval = int(cfg.GetFloat(cfgTickInterval))
136
	m.electionTick = int(cfg.GetFloat(cfgElectionTick))
137
	if m.tickInterval <= 300 {
138
		m.tickInterval = 500
139
	}
140
	if m.electionTick <= 3 {
141
		m.electionTick = 5
142
	}
143

144
	return
145
}
146

147
func (m *Server) initFsm() {
148
	m.fsm = newKeystoreFsm(m.rocksDBStore, m.retainLogs, m.raftStore.RaftServer())
149
	m.fsm.registerLeaderChangeHandler(m.handleLeaderChange)
150
	m.fsm.registerPeerChangeHandler(m.handlePeerChange)
151

152
	m.fsm.id = m.id
153

154
	// register the handlers for the interfaces defined in the Raft library
155
	m.fsm.registerApplySnapshotHandler(m.handleApplySnapshot)
156
	m.fsm.restore()
157
}
158

159
func (m *Server) createRaftServer(cfg *config.Config) (err error) {
160
	raftCfg := &raftstore.Config{
161
		NodeID:            m.id,
162
		RaftPath:          m.walDir,
163
		NumOfLogsToRetain: m.retainLogs,
164
		HeartbeatPort:     int(m.config.heartbeatPort),
165
		ReplicaPort:       int(m.config.replicaPort),
166
		TickInterval:      m.tickInterval,
167
		ElectionTick:      m.electionTick,
168
	}
169
	if m.raftStore, err = raftstore.NewRaftStore(raftCfg, cfg); err != nil {
170
		return errors.Trace(err, "NewRaftStore failed! id[%v] walPath[%v]", m.id, m.walDir)
171
	}
172
	m.initFsm()
173
	partitionCfg := &raftstore.PartitionConfig{
174
		ID:      GroupID,
175
		Peers:   m.config.peers,
176
		Applied: m.fsm.applied,
177
		SM:      m.fsm,
178
	}
179
	if m.partition, err = m.raftStore.CreatePartition(partitionCfg); err != nil {
180
		return errors.Trace(err, "CreatePartition failed")
181
	}
182
	return
183
}
184

185
// Start starts a server
186
func (m *Server) Start(cfg *config.Config) (err error) {
187
	m.config = newClusterConfig()
188
	m.leaderInfo = &LeaderInfo{}
189
	if err = m.checkConfig(cfg); err != nil {
190
		log.LogError(errors.Stack(err))
191
		return
192
	}
193
	if m.rocksDBStore, err = raftstore_db.NewRocksDBStore(m.storeDir, LRUCacheSize, WriteBufferSize); err != nil {
194
		log.LogErrorf("Start: init RocksDB fail: err(%v)", err)
195
		return
196
	}
197

198
	if err = m.createRaftServer(cfg); err != nil {
199
		log.LogError(errors.Stack(err))
200
		return
201
	}
202
	m.initCluster()
203
	m.cluster.partition = m.partition
204

205
	AuthSecretKey := cfg.GetString(AuthSecretKey)
206
	if m.cluster.AuthSecretKey, err = cryptoutil.Base64Decode(AuthSecretKey); err != nil {
207
		return fmt.Errorf("action[Start] failed %v,err: auth service Key invalid=%s", proto.ErrInvalidCfg, AuthSecretKey)
208
	}
209

210
	AuthRootKey := cfg.GetString(AuthRootKey)
211
	if m.cluster.AuthRootKey, err = cryptoutil.Base64Decode(AuthRootKey); err != nil {
212
		return fmt.Errorf("action[Start] failed %v,err: auth root Key invalid=%s", proto.ErrInvalidCfg, AuthRootKey)
213
	}
214

215
	if cfg.GetBool(EnableHTTPS) == true {
216
		m.cluster.PKIKey.EnableHTTPS = true
217
		if m.cluster.PKIKey.AuthRootPublicKey, err = os.ReadFile("/app/server.crt"); err != nil {
218
			return fmt.Errorf("action[Start] failed,err[%v]", err)
219
		}
220
		if m.cluster.PKIKey.AuthRootPrivateKey, err = os.ReadFile("/app/server.key"); err != nil {
221
			return fmt.Errorf("action[Start] failed,err[%v]", err)
222
		}
223
		// TODO: verify cert
224
	} else {
225
		m.cluster.PKIKey.EnableHTTPS = false
226
	}
227
	m.authProxy = m.newAuthProxy()
228

229
	m.cluster.scheduleTask()
230
	m.startHTTPService()
231
	m.wg.Add(1)
232
	return nil
233
}
234

235
// Shutdown closes the server
236
func (m *Server) Shutdown() {
237
	m.wg.Done()
238
}
239

240
// Sync waits for the execution termination of the server
241
func (m *Server) Sync() {
242
	m.wg.Wait()
243
}
244

245
func (m *Server) initCluster() {
246
	m.cluster = newCluster(m.clusterName, m.leaderInfo, m.fsm, m.partition, m.config)
247
	m.cluster.retainLogs = m.retainLogs
248
}
249

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.