cubefs

Форк
0
/
partition_fsmop.go 
261 строка · 7.1 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package metanode
16

17
import (
18
	"encoding/binary"
19
	"encoding/json"
20
	"fmt"
21
	"io/ioutil"
22
	"os"
23
	"path"
24
	"strings"
25
	"time"
26

27
	"github.com/cubefs/cubefs/proto"
28
	"github.com/cubefs/cubefs/util/log"
29
)
30

31
func (mp *metaPartition) initInode(ino *Inode) {
32
	for {
33
		time.Sleep(10 * time.Nanosecond)
34
		select {
35
		case <-mp.stopC:
36
			return
37
		default:
38
			// check first root inode
39
			if mp.hasInode(ino) {
40
				return
41
			}
42
			if !mp.raftPartition.IsRaftLeader() {
43
				continue
44
			}
45
			// qinode := &MetaQuotaInode{
46
			// 	inode:    ino,
47
			// 	quotaIds: make([]uint32, 0, 0),
48
			// }
49
			// data, err := qinode.Marshal()
50
			// if err != nil {
51
			// 	log.LogFatalf("[initInode] marshal: %s", err.Error())
52
			// }
53

54
			data, err := ino.Marshal()
55
			if err != nil {
56
				log.LogFatalf("[initInode] marshal: %s", err.Error())
57
			}
58
			// put first root inode
59
			resp, err := mp.submit(opFSMCreateInode, data)
60
			if err != nil {
61
				log.LogFatalf("[initInode] raft sync: %s", err.Error())
62
			}
63
			p := &Packet{}
64
			p.ResultCode = resp.(uint8)
65
			log.LogDebugf("[initInode] raft sync: response status = %v.",
66
				p.GetResultMsg())
67
			return
68
		}
69
	}
70
}
71

72
// Not implemented.
73
func (mp *metaPartition) decommissionPartition() (err error) {
74
	return
75
}
76

77
func (mp *metaPartition) fsmUpdatePartition(end uint64) (status uint8,
78
	err error) {
79
	status = proto.OpOk
80
	oldEnd := mp.config.End
81
	mp.config.End = end
82

83
	if end < mp.config.Cursor {
84
		status = proto.OpAgain
85
		mp.config.End = oldEnd
86
		return
87
	}
88
	if err = mp.PersistMetadata(); err != nil {
89
		status = proto.OpDiskErr
90
		mp.config.End = oldEnd
91
	}
92
	return
93
}
94

95
func (mp *metaPartition) confAddNode(req *proto.AddMetaPartitionRaftMemberRequest, index uint64) (updated bool, err error) {
96
	var (
97
		heartbeatPort int
98
		replicaPort   int
99
	)
100
	if heartbeatPort, replicaPort, err = mp.getRaftPort(); err != nil {
101
		return
102
	}
103

104
	addPeer := false
105
	for _, peer := range mp.config.Peers {
106
		if peer.ID == req.AddPeer.ID {
107
			addPeer = true
108
			break
109
		}
110
	}
111
	updated = !addPeer
112
	if !updated {
113
		return
114
	}
115
	mp.config.Peers = append(mp.config.Peers, req.AddPeer)
116
	addr := strings.Split(req.AddPeer.Addr, ":")[0]
117
	mp.config.RaftStore.AddNodeWithPort(req.AddPeer.ID, addr, heartbeatPort, replicaPort)
118
	return
119
}
120

121
func (mp *metaPartition) confRemoveNode(req *proto.RemoveMetaPartitionRaftMemberRequest, index uint64) (updated bool, err error) {
122
	var canRemoveSelf bool
123
	if canRemoveSelf, err = mp.canRemoveSelf(); err != nil {
124
		return
125
	}
126
	peerIndex := -1
127
	data, _ := json.Marshal(req)
128
	log.LogInfof("Start RemoveRaftNode  PartitionID(%v) nodeID(%v)  do RaftLog (%v) ",
129
		req.PartitionId, mp.config.NodeId, string(data))
130
	for i, peer := range mp.config.Peers {
131
		if peer.ID == req.RemovePeer.ID {
132
			updated = true
133
			peerIndex = i
134
			break
135
		}
136
	}
137
	if !updated {
138
		log.LogInfof("NoUpdate RemoveRaftNode  PartitionID(%v) nodeID(%v)  do RaftLog (%v) ",
139
			req.PartitionId, mp.config.NodeId, string(data))
140
		return
141
	}
142
	mp.config.Peers = append(mp.config.Peers[:peerIndex], mp.config.Peers[peerIndex+1:]...)
143
	if mp.config.NodeId == req.RemovePeer.ID && !mp.isLoadingMetaPartition && canRemoveSelf {
144
		mp.Stop()
145
		mp.DeleteRaft()
146
		mp.manager.deletePartition(mp.GetBaseConfig().PartitionId)
147
		os.RemoveAll(mp.config.RootDir)
148
		updated = false
149
	}
150
	log.LogInfof("Fininsh RemoveRaftNode  PartitionID(%v) nodeID(%v)  do RaftLog (%v) ",
151
		req.PartitionId, mp.config.NodeId, string(data))
152
	return
153
}
154

155
func (mp *metaPartition) delOldExtentFile(buf []byte) (err error) {
156
	fileName := string(buf)
157
	log.LogWarnf("[delOldExtentFile] del extent file(%s), mp[%v]", fileName, mp.config.PartitionId)
158

159
	infos, err := ioutil.ReadDir(mp.config.RootDir)
160
	if err != nil {
161
		return
162
	}
163

164
	infos = sortDelExtFileInfo(infos)
165
	tgtIdx := getDelExtFileIdx(fileName)
166

167
	for _, f := range infos {
168
		idx := getDelExtFileIdx(f.Name())
169
		if idx > tgtIdx {
170
			break
171
		}
172

173
		log.LogWarnf("[delOldExtentFile] del extent file(%s), mp[%v]", f.Name(), mp.config.PartitionId)
174
		os.Remove(path.Join(mp.config.RootDir, f.Name()))
175
	}
176

177
	return
178
}
179

180
//
181
func (mp *metaPartition) setExtentDeleteFileCursor(buf []byte) (err error) {
182
	str := string(buf)
183
	var (
184
		fileName string
185
		cursor   int64
186
	)
187
	_, err = fmt.Sscanf(str, "%s %d", &fileName, &cursor)
188
	log.LogInfof("[setExtentDeleteFileCursor] &fileName_&cursor(%s), mp[%v]", str, mp.config.PartitionId)
189
	if err != nil {
190
		return
191
	}
192
	fp, err := os.OpenFile(path.Join(mp.config.RootDir, fileName), os.O_CREATE|os.O_RDWR,
193
		0o644)
194
	if err != nil {
195
		log.LogErrorf("[setExtentDeleteFileCursor] openFile %s failed: %s",
196
			fileName, err.Error())
197
		return
198
	}
199
	if err = binary.Write(fp, binary.BigEndian, cursor); err != nil {
200
		log.LogErrorf("[setExtentDeleteFileCursor] write file %s cursor"+
201
			" failed: %s", fileName, err.Error())
202
	}
203
	// TODO Unhandled errors
204
	fp.Close()
205
	return
206
}
207

208
func (mp *metaPartition) CanRemoveRaftMember(peer proto.Peer) error {
209
	downReplicas := mp.config.RaftStore.RaftServer().GetDownReplicas(mp.config.PartitionId)
210
	hasExsit := false
211
	for _, p := range mp.config.Peers {
212
		if p.ID == peer.ID {
213
			hasExsit = true
214
			break
215
		}
216
	}
217
	if !hasExsit {
218
		return nil
219
	}
220

221
	hasDownReplicasExcludePeer := make([]uint64, 0)
222
	for _, nodeID := range downReplicas {
223
		if nodeID.NodeID == peer.ID {
224
			continue
225
		}
226
		hasDownReplicasExcludePeer = append(hasDownReplicasExcludePeer, nodeID.NodeID)
227
	}
228

229
	sumReplicas := len(mp.config.Peers)
230
	if sumReplicas%2 == 1 {
231
		if sumReplicas-len(hasDownReplicasExcludePeer) > (sumReplicas/2 + 1) {
232
			return nil
233
		}
234
	} else {
235
		if sumReplicas-len(hasDownReplicasExcludePeer) >= (sumReplicas/2 + 1) {
236
			return nil
237
		}
238
	}
239

240
	return fmt.Errorf("downReplicas(%v) too much,so donnot offline (%v)", downReplicas, peer)
241
}
242

243
func (mp *metaPartition) IsEquareCreateMetaPartitionRequst(request *proto.CreateMetaPartitionRequest) (err error) {
244
	if len(mp.config.Peers) != len(request.Members) {
245
		return fmt.Errorf("Exsit unavali Partition(%v) partitionHosts(%v) requestHosts(%v)", mp.config.PartitionId, mp.config.Peers, request.Members)
246
	}
247
	if mp.config.Start != request.Start || mp.config.End != request.End {
248
		return fmt.Errorf("Exsit unavali Partition(%v) range(%v-%v) requestRange(%v-%v)", mp.config.PartitionId, mp.config.Start, mp.config.End, request.Start, request.End)
249
	}
250
	for index, peer := range mp.config.Peers {
251
		requestPeer := request.Members[index]
252
		if requestPeer.ID != peer.ID || requestPeer.Addr != peer.Addr {
253
			return fmt.Errorf("Exsit unavali Partition(%v) partitionHosts(%v) requestHosts(%v)", mp.config.PartitionId, mp.config.Peers, request.Members)
254
		}
255
	}
256
	if mp.config.VolName != request.VolName {
257
		return fmt.Errorf("Exsit unavali Partition(%v) VolName(%v) requestVolName(%v)", mp.config.PartitionId, mp.config.VolName, request.VolName)
258
	}
259

260
	return
261
}
262

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.