27
"github.com/cubefs/cubefs/proto"
28
"github.com/cubefs/cubefs/util/log"
31
func (mp *metaPartition) initInode(ino *Inode) {
33
time.Sleep(10 * time.Nanosecond)
42
if !mp.raftPartition.IsRaftLeader() {
54
data, err := ino.Marshal()
56
log.LogFatalf("[initInode] marshal: %s", err.Error())
59
resp, err := mp.submit(opFSMCreateInode, data)
61
log.LogFatalf("[initInode] raft sync: %s", err.Error())
64
p.ResultCode = resp.(uint8)
65
log.LogDebugf("[initInode] raft sync: response status = %v.",
73
func (mp *metaPartition) decommissionPartition() (err error) {
77
func (mp *metaPartition) fsmUpdatePartition(end uint64) (status uint8,
80
oldEnd := mp.config.End
83
if end < mp.config.Cursor {
84
status = proto.OpAgain
85
mp.config.End = oldEnd
88
if err = mp.PersistMetadata(); err != nil {
89
status = proto.OpDiskErr
90
mp.config.End = oldEnd
95
func (mp *metaPartition) confAddNode(req *proto.AddMetaPartitionRaftMemberRequest, index uint64) (updated bool, err error) {
100
if heartbeatPort, replicaPort, err = mp.getRaftPort(); err != nil {
105
for _, peer := range mp.config.Peers {
106
if peer.ID == req.AddPeer.ID {
115
mp.config.Peers = append(mp.config.Peers, req.AddPeer)
116
addr := strings.Split(req.AddPeer.Addr, ":")[0]
117
mp.config.RaftStore.AddNodeWithPort(req.AddPeer.ID, addr, heartbeatPort, replicaPort)
121
func (mp *metaPartition) confRemoveNode(req *proto.RemoveMetaPartitionRaftMemberRequest, index uint64) (updated bool, err error) {
122
var canRemoveSelf bool
123
if canRemoveSelf, err = mp.canRemoveSelf(); err != nil {
127
data, _ := json.Marshal(req)
128
log.LogInfof("Start RemoveRaftNode PartitionID(%v) nodeID(%v) do RaftLog (%v) ",
129
req.PartitionId, mp.config.NodeId, string(data))
130
for i, peer := range mp.config.Peers {
131
if peer.ID == req.RemovePeer.ID {
138
log.LogInfof("NoUpdate RemoveRaftNode PartitionID(%v) nodeID(%v) do RaftLog (%v) ",
139
req.PartitionId, mp.config.NodeId, string(data))
142
mp.config.Peers = append(mp.config.Peers[:peerIndex], mp.config.Peers[peerIndex+1:]...)
143
if mp.config.NodeId == req.RemovePeer.ID && !mp.isLoadingMetaPartition && canRemoveSelf {
146
mp.manager.deletePartition(mp.GetBaseConfig().PartitionId)
147
os.RemoveAll(mp.config.RootDir)
150
log.LogInfof("Fininsh RemoveRaftNode PartitionID(%v) nodeID(%v) do RaftLog (%v) ",
151
req.PartitionId, mp.config.NodeId, string(data))
155
func (mp *metaPartition) delOldExtentFile(buf []byte) (err error) {
156
fileName := string(buf)
157
log.LogWarnf("[delOldExtentFile] del extent file(%s), mp[%v]", fileName, mp.config.PartitionId)
159
infos, err := ioutil.ReadDir(mp.config.RootDir)
164
infos = sortDelExtFileInfo(infos)
165
tgtIdx := getDelExtFileIdx(fileName)
167
for _, f := range infos {
168
idx := getDelExtFileIdx(f.Name())
173
log.LogWarnf("[delOldExtentFile] del extent file(%s), mp[%v]", f.Name(), mp.config.PartitionId)
174
os.Remove(path.Join(mp.config.RootDir, f.Name()))
181
func (mp *metaPartition) setExtentDeleteFileCursor(buf []byte) (err error) {
187
_, err = fmt.Sscanf(str, "%s %d", &fileName, &cursor)
188
log.LogInfof("[setExtentDeleteFileCursor] &fileName_&cursor(%s), mp[%v]", str, mp.config.PartitionId)
192
fp, err := os.OpenFile(path.Join(mp.config.RootDir, fileName), os.O_CREATE|os.O_RDWR,
195
log.LogErrorf("[setExtentDeleteFileCursor] openFile %s failed: %s",
196
fileName, err.Error())
199
if err = binary.Write(fp, binary.BigEndian, cursor); err != nil {
200
log.LogErrorf("[setExtentDeleteFileCursor] write file %s cursor"+
201
" failed: %s", fileName, err.Error())
208
func (mp *metaPartition) CanRemoveRaftMember(peer proto.Peer) error {
209
downReplicas := mp.config.RaftStore.RaftServer().GetDownReplicas(mp.config.PartitionId)
211
for _, p := range mp.config.Peers {
221
hasDownReplicasExcludePeer := make([]uint64, 0)
222
for _, nodeID := range downReplicas {
223
if nodeID.NodeID == peer.ID {
226
hasDownReplicasExcludePeer = append(hasDownReplicasExcludePeer, nodeID.NodeID)
229
sumReplicas := len(mp.config.Peers)
230
if sumReplicas%2 == 1 {
231
if sumReplicas-len(hasDownReplicasExcludePeer) > (sumReplicas/2 + 1) {
235
if sumReplicas-len(hasDownReplicasExcludePeer) >= (sumReplicas/2 + 1) {
240
return fmt.Errorf("downReplicas(%v) too much,so donnot offline (%v)", downReplicas, peer)
243
func (mp *metaPartition) IsEquareCreateMetaPartitionRequst(request *proto.CreateMetaPartitionRequest) (err error) {
244
if len(mp.config.Peers) != len(request.Members) {
245
return fmt.Errorf("Exsit unavali Partition(%v) partitionHosts(%v) requestHosts(%v)", mp.config.PartitionId, mp.config.Peers, request.Members)
247
if mp.config.Start != request.Start || mp.config.End != request.End {
248
return fmt.Errorf("Exsit unavali Partition(%v) range(%v-%v) requestRange(%v-%v)", mp.config.PartitionId, mp.config.Start, mp.config.End, request.Start, request.End)
250
for index, peer := range mp.config.Peers {
251
requestPeer := request.Members[index]
252
if requestPeer.ID != peer.ID || requestPeer.Addr != peer.Addr {
253
return fmt.Errorf("Exsit unavali Partition(%v) partitionHosts(%v) requestHosts(%v)", mp.config.PartitionId, mp.config.Peers, request.Members)
256
if mp.config.VolName != request.VolName {
257
return fmt.Errorf("Exsit unavali Partition(%v) VolName(%v) requestVolName(%v)", mp.config.PartitionId, mp.config.VolName, request.VolName)