1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
20
"github.com/cubefs/cubefs/depends/tiglabs/raft"
21
"github.com/cubefs/cubefs/depends/tiglabs/raft/proto"
24
// PartitionStatus is a type alias of raft.Status
25
type PartitionStatus = raft.Status
27
// PartitionFsm wraps necessary methods include both FSM implementation
28
// and data storage operation for raft store partition.
29
// It extends from raft StateMachine and Store.
30
type PartitionFsm = raft.StateMachine
32
// Partition wraps necessary methods for raft store partition operation.
33
// Partition is a shard for multi-raft in RaftSore. RaftStore is based on multi-raft which
34
// manages multiple raft replication groups at same time through a single
35
// raft server instance and system resource.
36
type Partition interface {
37
// Submit submits command data to raft log.
38
Submit(cmd []byte) (resp interface{}, err error)
40
// ChangeMember submits member change event and information to raft log.
41
ChangeMember(changeType proto.ConfChangeType, peer proto.Peer, context []byte) (resp interface{}, err error)
43
// Stop removes the raft partition from raft server and shuts down this partition.
46
// Delete stops and deletes the partition.
49
// Status returns the current raft status.
50
Status() (status *PartitionStatus)
52
// IsRestoring Much faster then status().RestoringSnapshot.
55
// LeaderTerm returns the current term of leader in the raft group. TODO what is term?
56
LeaderTerm() (leaderID, term uint64)
58
// IsRaftLeader returns true if this node is the leader of the raft group it belongs to.
61
// AppliedIndex returns the current index of the applied raft log in the raft store partition.
64
// CommittedIndex returns the current index of the applied raft log in the raft store partition.
65
CommittedIndex() uint64
68
Truncate(index uint64)
69
TryToLeader(nodeID uint64) error
73
// Default implementation of the Partition interface.
74
type partition struct {
78
config *PartitionConfig
81
// ChangeMember submits member change event and information to raft log.
82
func (p *partition) ChangeMember(changeType proto.ConfChangeType, peer proto.Peer, context []byte) (
83
resp interface{}, err error) {
84
if !p.IsRaftLeader() {
85
err = raft.ErrNotLeader
88
future := p.raft.ChangeMember(p.id, changeType, peer, context)
89
resp, err = future.Response()
93
// Stop removes the raft partition from raft server and shuts down this partition.
94
func (p *partition) Stop() (err error) {
95
err = p.raft.RemoveRaft(p.id)
99
func (p *partition) TryToLeader(nodeID uint64) (err error) {
100
future := p.raft.TryToLeader(nodeID)
101
_, err = future.Response()
105
// Delete stops and deletes the partition.
106
func (p *partition) Delete() (err error) {
107
if err = p.Stop(); err != nil {
110
err = os.RemoveAll(p.walPath)
114
func (p *partition) IsRestoring() bool {
115
return p.raft.IsRestoring(p.id)
118
// Status returns the current raft status.
119
func (p *partition) Status() (status *PartitionStatus) {
120
status = p.raft.Status(p.id)
124
// LeaderTerm returns the current term of leader in the raft group.
125
func (p *partition) LeaderTerm() (leaderID, term uint64) {
130
leaderID, term = p.raft.LeaderTerm(p.id)
134
func (p *partition) IsOfflinePeer() bool {
138
for _, peer := range status.Replicas {
145
return active >= (int(sumPeers)/2 + 1)
148
// IsRaftLeader returns true if this node is the leader of the raft group it belongs to.
149
func (p *partition) IsRaftLeader() (isLeader bool) {
150
isLeader = p.raft != nil && p.raft.IsLeader(p.id)
154
// AppliedIndex returns the current index of the applied raft log in the raft store partition.
155
func (p *partition) AppliedIndex() (applied uint64) {
156
applied = p.raft.AppliedIndex(p.id)
160
// CommittedIndex returns the current index of the applied raft log in the raft store partition.
161
func (p *partition) CommittedIndex() (applied uint64) {
162
applied = p.raft.CommittedIndex(p.id)
166
// Submit submits command data to raft log.
167
func (p *partition) Submit(cmd []byte) (resp interface{}, err error) {
168
if !p.IsRaftLeader() {
169
err = raft.ErrNotLeader
172
future := p.raft.Submit(p.id, cmd)
173
resp, err = future.Response()
177
// Truncate truncates the raft log
178
func (p *partition) Truncate(index uint64) {
180
p.raft.Truncate(p.id, index)
184
func newPartition(cfg *PartitionConfig, raft *raft.RaftServer, walPath string) Partition {