1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
25
"github.com/cubefs/cubefs/depends/tiglabs/raft"
26
"github.com/cubefs/cubefs/depends/tiglabs/raft/logger"
27
"github.com/cubefs/cubefs/depends/tiglabs/raft/proto"
28
"github.com/cubefs/cubefs/depends/tiglabs/raft/storage/wal"
29
raftlog "github.com/cubefs/cubefs/depends/tiglabs/raft/util/log"
30
utilConfig "github.com/cubefs/cubefs/util/config"
33
// RaftStore defines the interface for the raft store.
34
type RaftStore interface {
35
CreatePartition(cfg *PartitionConfig) (Partition, error)
37
RaftConfig() *raft.Config
38
RaftStatus(raftID uint64) (raftStatus *raft.Status)
40
RaftServer() *raft.RaftServer
43
type raftStore struct {
46
raftConfig *raft.Config
47
raftServer *raft.RaftServer
51
// RaftConfig returns the raft configuration.
52
func (s *raftStore) RaftConfig() *raft.Config {
56
func (s *raftStore) RaftStatus(raftID uint64) (raftStatus *raft.Status) {
57
return s.raftServer.Status(raftID)
60
// AddNodeWithPort add a new node with the given port.
61
func (s *raftStore) AddNodeWithPort(nodeID uint64, addr string, heartbeat int, replicate int) {
62
s.resolver.AddNodeWithPort(nodeID, addr, heartbeat, replicate)
65
// DeleteNode deletes the node with the given ID in the raft store.
66
func (s *raftStore) DeleteNode(nodeID uint64) {
67
s.resolver.DeleteNode(nodeID)
70
// Stop stops the raft store server.
71
func (s *raftStore) Stop() {
72
if s.raftServer != nil {
77
func newRaftLogger(dir string) {
78
raftLogPath := path.Join(dir, "logs")
79
_, err := os.Stat(raftLogPath)
81
if pathErr, ok := err.(*os.PathError); ok {
82
if os.IsNotExist(pathErr) {
83
os.MkdirAll(raftLogPath, 0o755)
88
raftLog, err := raftlog.NewLog(raftLogPath, "raft", "debug")
90
syslog.Println("Fatal: failed to start the baud storage daemon - ", err)
93
logger.SetLogger(raftLog)
97
// NewRaftStore returns a new raft store instance.
98
func NewRaftStore(cfg *Config, extendCfg *utilConfig.Config) (mr RaftStore, err error) {
99
resolver := NewNodeResolver()
101
newRaftLogger(cfg.RaftPath)
102
setMonitorConf(extendCfg)
104
rc := raft.DefaultConfig()
105
rc.NodeID = cfg.NodeID
108
if cfg.HeartbeatPort <= 0 {
109
cfg.HeartbeatPort = DefaultHeartbeatPort
111
if cfg.ReplicaPort <= 0 {
112
cfg.ReplicaPort = DefaultReplicaPort
114
if cfg.NumOfLogsToRetain == 0 {
115
cfg.NumOfLogsToRetain = DefaultNumOfLogsToRetain
117
if cfg.ElectionTick < DefaultElectionTick {
118
cfg.ElectionTick = DefaultElectionTick
120
if cfg.TickInterval < DefaultTickInterval {
121
cfg.TickInterval = DefaultTickInterval
123
// if cfg's RecvBufSize bigger than the default 2048,
124
// use the bigger one.
125
if cfg.RecvBufSize > rc.ReqBufferSize {
126
rc.ReqBufferSize = cfg.RecvBufSize
128
rc.HeartbeatAddr = fmt.Sprintf("%s:%d", cfg.IPAddr, cfg.HeartbeatPort)
129
rc.ReplicateAddr = fmt.Sprintf("%s:%d", cfg.IPAddr, cfg.ReplicaPort)
130
rc.Resolver = resolver
131
rc.RetainLogs = cfg.NumOfLogsToRetain
132
rc.TickInterval = time.Duration(cfg.TickInterval) * time.Millisecond
133
rc.ElectionTick = cfg.ElectionTick
134
rs, err := raft.NewRaftServer(rc)
143
raftPath: cfg.RaftPath,
148
func (s *raftStore) RaftServer() *raft.RaftServer {
152
// CreatePartition creates a new partition in the raft store.
153
func (s *raftStore) CreatePartition(cfg *PartitionConfig) (p Partition, err error) {
154
// Init WaL Storage for this partition.
156
// wc: WaL Configuration.
160
if cfg.WalPath == "" {
161
walPath = path.Join(s.raftPath, strconv.FormatUint(cfg.ID, 10))
163
walPath = path.Join(cfg.WalPath, "wal_"+strconv.FormatUint(cfg.ID, 10))
167
ws, err := wal.NewStorage(walPath, wc)
171
peers := make([]proto.Peer, 0)
172
for _, peerAddress := range cfg.Peers {
173
peers = append(peers, peerAddress.Peer)
177
peerAddress.HeartbeatPort,
178
peerAddress.ReplicaPort,
181
logger.Info("action[raftstore:CreatePartition] raft config applied [%v] id:%d", cfg.Applied, cfg.ID)
182
rc := &raft.RaftConfig{
188
StateMachine: cfg.SM,
189
Applied: cfg.Applied,
190
Monitor: newMonitor(),
192
if err = s.raftServer.CreateRaft(rc); err != nil {
195
p = newPartition(cfg, s.raftServer, walPath)