cubefs

Форк
0
/
partition.go 
191 строка · 5.3 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package raftstore
16

17
import (
18
	"os"
19

20
	"github.com/cubefs/cubefs/depends/tiglabs/raft"
21
	"github.com/cubefs/cubefs/depends/tiglabs/raft/proto"
22
)
23

24
// PartitionStatus is a type alias of raft.Status
25
type PartitionStatus = raft.Status
26

27
// PartitionFsm wraps necessary methods include both FSM implementation
28
// and data storage operation for raft store partition.
29
// It extends from raft StateMachine and Store.
30
type PartitionFsm = raft.StateMachine
31

32
// Partition wraps necessary methods for raft store partition operation.
33
// Partition is a shard for multi-raft in RaftSore. RaftStore is based on multi-raft which
34
// manages multiple raft replication groups at same time through a single
35
// raft server instance and system resource.
36
type Partition interface {
37
	// Submit submits command data to raft log.
38
	Submit(cmd []byte) (resp interface{}, err error)
39

40
	// ChangeMember submits member change event and information to raft log.
41
	ChangeMember(changeType proto.ConfChangeType, peer proto.Peer, context []byte) (resp interface{}, err error)
42

43
	// Stop removes the raft partition from raft server and shuts down this partition.
44
	Stop() error
45

46
	// Delete stops and deletes the partition.
47
	Delete() error
48

49
	// Status returns the current raft status.
50
	Status() (status *PartitionStatus)
51

52
	// IsRestoring Much faster then status().RestoringSnapshot.
53
	IsRestoring() bool
54

55
	// LeaderTerm returns the current term of leader in the raft group. TODO what is term?
56
	LeaderTerm() (leaderID, term uint64)
57

58
	// IsRaftLeader returns true if this node is the leader of the raft group it belongs to.
59
	IsRaftLeader() bool
60

61
	// AppliedIndex returns the current index of the applied raft log in the raft store partition.
62
	AppliedIndex() uint64
63

64
	// CommittedIndex returns the current index of the applied raft log in the raft store partition.
65
	CommittedIndex() uint64
66

67
	// Truncate raft log
68
	Truncate(index uint64)
69
	TryToLeader(nodeID uint64) error
70
	IsOfflinePeer() bool
71
}
72

73
// Default implementation of the Partition interface.
74
type partition struct {
75
	id      uint64
76
	raft    *raft.RaftServer
77
	walPath string
78
	config  *PartitionConfig
79
}
80

81
// ChangeMember submits member change event and information to raft log.
82
func (p *partition) ChangeMember(changeType proto.ConfChangeType, peer proto.Peer, context []byte) (
83
	resp interface{}, err error) {
84
	if !p.IsRaftLeader() {
85
		err = raft.ErrNotLeader
86
		return
87
	}
88
	future := p.raft.ChangeMember(p.id, changeType, peer, context)
89
	resp, err = future.Response()
90
	return
91
}
92

93
// Stop removes the raft partition from raft server and shuts down this partition.
94
func (p *partition) Stop() (err error) {
95
	err = p.raft.RemoveRaft(p.id)
96
	return
97
}
98

99
func (p *partition) TryToLeader(nodeID uint64) (err error) {
100
	future := p.raft.TryToLeader(nodeID)
101
	_, err = future.Response()
102
	return
103
}
104

105
// Delete stops and deletes the partition.
106
func (p *partition) Delete() (err error) {
107
	if err = p.Stop(); err != nil {
108
		return
109
	}
110
	err = os.RemoveAll(p.walPath)
111
	return
112
}
113

114
func (p *partition) IsRestoring() bool {
115
	return p.raft.IsRestoring(p.id)
116
}
117

118
// Status returns the current raft status.
119
func (p *partition) Status() (status *PartitionStatus) {
120
	status = p.raft.Status(p.id)
121
	return
122
}
123

124
// LeaderTerm returns the current term of leader in the raft group.
125
func (p *partition) LeaderTerm() (leaderID, term uint64) {
126
	if p.raft == nil {
127
		return
128
	}
129

130
	leaderID, term = p.raft.LeaderTerm(p.id)
131
	return
132
}
133

134
func (p *partition) IsOfflinePeer() bool {
135
	status := p.Status()
136
	active := 0
137
	sumPeers := 0
138
	for _, peer := range status.Replicas {
139
		if peer.Active {
140
			active++
141
		}
142
		sumPeers++
143
	}
144

145
	return active >= (int(sumPeers)/2 + 1)
146
}
147

148
// IsRaftLeader returns true if this node is the leader of the raft group it belongs to.
149
func (p *partition) IsRaftLeader() (isLeader bool) {
150
	isLeader = p.raft != nil && p.raft.IsLeader(p.id)
151
	return
152
}
153

154
// AppliedIndex returns the current index of the applied raft log in the raft store partition.
155
func (p *partition) AppliedIndex() (applied uint64) {
156
	applied = p.raft.AppliedIndex(p.id)
157
	return
158
}
159

160
// CommittedIndex returns the current index of the applied raft log in the raft store partition.
161
func (p *partition) CommittedIndex() (applied uint64) {
162
	applied = p.raft.CommittedIndex(p.id)
163
	return
164
}
165

166
// Submit submits command data to raft log.
167
func (p *partition) Submit(cmd []byte) (resp interface{}, err error) {
168
	if !p.IsRaftLeader() {
169
		err = raft.ErrNotLeader
170
		return
171
	}
172
	future := p.raft.Submit(p.id, cmd)
173
	resp, err = future.Response()
174
	return
175
}
176

177
// Truncate truncates the raft log
178
func (p *partition) Truncate(index uint64) {
179
	if p.raft != nil {
180
		p.raft.Truncate(p.id, index)
181
	}
182
}
183

184
func newPartition(cfg *PartitionConfig, raft *raft.RaftServer, walPath string) Partition {
185
	return &partition{
186
		id:      cfg.ID,
187
		raft:    raft,
188
		walPath: walPath,
189
		config:  cfg,
190
	}
191
}
192

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.