cubefs

Форк
0
/
raftstore.go 
197 строк · 5.2 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package raftstore
16

17
import (
18
	"fmt"
19
	syslog "log"
20
	"os"
21
	"path"
22
	"strconv"
23
	"time"
24

25
	"github.com/cubefs/cubefs/depends/tiglabs/raft"
26
	"github.com/cubefs/cubefs/depends/tiglabs/raft/logger"
27
	"github.com/cubefs/cubefs/depends/tiglabs/raft/proto"
28
	"github.com/cubefs/cubefs/depends/tiglabs/raft/storage/wal"
29
	raftlog "github.com/cubefs/cubefs/depends/tiglabs/raft/util/log"
30
	utilConfig "github.com/cubefs/cubefs/util/config"
31
)
32

33
// RaftStore defines the interface for the raft store.
34
type RaftStore interface {
35
	CreatePartition(cfg *PartitionConfig) (Partition, error)
36
	Stop()
37
	RaftConfig() *raft.Config
38
	RaftStatus(raftID uint64) (raftStatus *raft.Status)
39
	NodeManager
40
	RaftServer() *raft.RaftServer
41
}
42

43
type raftStore struct {
44
	nodeID     uint64
45
	resolver   NodeResolver
46
	raftConfig *raft.Config
47
	raftServer *raft.RaftServer
48
	raftPath   string
49
}
50

51
// RaftConfig returns the raft configuration.
52
func (s *raftStore) RaftConfig() *raft.Config {
53
	return s.raftConfig
54
}
55

56
func (s *raftStore) RaftStatus(raftID uint64) (raftStatus *raft.Status) {
57
	return s.raftServer.Status(raftID)
58
}
59

60
// AddNodeWithPort add a new node with the given port.
61
func (s *raftStore) AddNodeWithPort(nodeID uint64, addr string, heartbeat int, replicate int) {
62
	s.resolver.AddNodeWithPort(nodeID, addr, heartbeat, replicate)
63
}
64

65
// DeleteNode deletes the node with the given ID in the raft store.
66
func (s *raftStore) DeleteNode(nodeID uint64) {
67
	s.resolver.DeleteNode(nodeID)
68
}
69

70
// Stop stops the raft store server.
71
func (s *raftStore) Stop() {
72
	if s.raftServer != nil {
73
		s.raftServer.Stop()
74
	}
75
}
76

77
func newRaftLogger(dir string) {
78
	raftLogPath := path.Join(dir, "logs")
79
	_, err := os.Stat(raftLogPath)
80
	if err != nil {
81
		if pathErr, ok := err.(*os.PathError); ok {
82
			if os.IsNotExist(pathErr) {
83
				os.MkdirAll(raftLogPath, 0o755)
84
			}
85
		}
86
	}
87

88
	raftLog, err := raftlog.NewLog(raftLogPath, "raft", "debug")
89
	if err != nil {
90
		syslog.Println("Fatal: failed to start the baud storage daemon - ", err)
91
		return
92
	}
93
	logger.SetLogger(raftLog)
94
	return
95
}
96

97
// NewRaftStore returns a new raft store instance.
98
func NewRaftStore(cfg *Config, extendCfg *utilConfig.Config) (mr RaftStore, err error) {
99
	resolver := NewNodeResolver()
100

101
	newRaftLogger(cfg.RaftPath)
102
	setMonitorConf(extendCfg)
103

104
	rc := raft.DefaultConfig()
105
	rc.NodeID = cfg.NodeID
106
	rc.LeaseCheck = true
107
	rc.PreVote = true
108
	if cfg.HeartbeatPort <= 0 {
109
		cfg.HeartbeatPort = DefaultHeartbeatPort
110
	}
111
	if cfg.ReplicaPort <= 0 {
112
		cfg.ReplicaPort = DefaultReplicaPort
113
	}
114
	if cfg.NumOfLogsToRetain == 0 {
115
		cfg.NumOfLogsToRetain = DefaultNumOfLogsToRetain
116
	}
117
	if cfg.ElectionTick < DefaultElectionTick {
118
		cfg.ElectionTick = DefaultElectionTick
119
	}
120
	if cfg.TickInterval < DefaultTickInterval {
121
		cfg.TickInterval = DefaultTickInterval
122
	}
123
	// if cfg's RecvBufSize bigger than the default 2048,
124
	// use the bigger one.
125
	if cfg.RecvBufSize > rc.ReqBufferSize {
126
		rc.ReqBufferSize = cfg.RecvBufSize
127
	}
128
	rc.HeartbeatAddr = fmt.Sprintf("%s:%d", cfg.IPAddr, cfg.HeartbeatPort)
129
	rc.ReplicateAddr = fmt.Sprintf("%s:%d", cfg.IPAddr, cfg.ReplicaPort)
130
	rc.Resolver = resolver
131
	rc.RetainLogs = cfg.NumOfLogsToRetain
132
	rc.TickInterval = time.Duration(cfg.TickInterval) * time.Millisecond
133
	rc.ElectionTick = cfg.ElectionTick
134
	rs, err := raft.NewRaftServer(rc)
135
	if err != nil {
136
		return
137
	}
138
	mr = &raftStore{
139
		nodeID:     cfg.NodeID,
140
		resolver:   resolver,
141
		raftConfig: rc,
142
		raftServer: rs,
143
		raftPath:   cfg.RaftPath,
144
	}
145
	return
146
}
147

148
func (s *raftStore) RaftServer() *raft.RaftServer {
149
	return s.raftServer
150
}
151

152
// CreatePartition creates a new partition in the raft store.
153
func (s *raftStore) CreatePartition(cfg *PartitionConfig) (p Partition, err error) {
154
	// Init WaL Storage for this partition.
155
	// Variables:
156
	// wc: WaL Configuration.
157
	// wp: WaL Path.
158
	// ws: WaL Storage.
159
	var walPath string
160
	if cfg.WalPath == "" {
161
		walPath = path.Join(s.raftPath, strconv.FormatUint(cfg.ID, 10))
162
	} else {
163
		walPath = path.Join(cfg.WalPath, "wal_"+strconv.FormatUint(cfg.ID, 10))
164
	}
165

166
	wc := &wal.Config{}
167
	ws, err := wal.NewStorage(walPath, wc)
168
	if err != nil {
169
		return
170
	}
171
	peers := make([]proto.Peer, 0)
172
	for _, peerAddress := range cfg.Peers {
173
		peers = append(peers, peerAddress.Peer)
174
		s.AddNodeWithPort(
175
			peerAddress.ID,
176
			peerAddress.Address,
177
			peerAddress.HeartbeatPort,
178
			peerAddress.ReplicaPort,
179
		)
180
	}
181
	logger.Info("action[raftstore:CreatePartition] raft config applied [%v] id:%d", cfg.Applied, cfg.ID)
182
	rc := &raft.RaftConfig{
183
		ID:           cfg.ID,
184
		Peers:        peers,
185
		Leader:       cfg.Leader,
186
		Term:         cfg.Term,
187
		Storage:      ws,
188
		StateMachine: cfg.SM,
189
		Applied:      cfg.Applied,
190
		Monitor:      newMonitor(),
191
	}
192
	if err = s.raftServer.CreateRaft(rc); err != nil {
193
		return
194
	}
195
	p = newPartition(cfg, s.raftServer, walPath)
196
	return
197
}
198

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.