1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
23
"github.com/cubefs/cubefs/proto"
24
"github.com/cubefs/cubefs/repl"
25
"github.com/cubefs/cubefs/storage"
26
"github.com/cubefs/cubefs/util/log"
29
func (s *DataNode) Prepare(p *repl.Packet) (err error) {
31
p.SetPacketHasPrepare()
33
p.PackErrorBody(repl.ActionPreparePkt, err.Error())
38
if p.IsMasterCommand() {
41
atomic.AddUint64(&s.metricsCnt, 1)
42
if !s.shallDegrade() {
43
p.BeforeTp(s.clusterID)
48
err = s.checkStoreMode(p)
52
if err = s.checkCrc(p); err != nil {
55
if err = s.checkPartition(p); err != nil {
58
// For certain packet, we meed to add some additional extent information.
59
if err = s.checkPacketAndPrepare(p); err != nil {
66
func (s *DataNode) checkStoreMode(p *repl.Packet) (err error) {
67
if proto.IsTinyExtentType(p.ExtentType) || proto.IsNormalExtentType(p.ExtentType) {
70
log.LogErrorf("action[checkStoreMode] dp [%v] reqId [%v] extent type %v", p.PartitionID, p.ReqID, p.ExtentType)
71
return ErrIncorrectStoreType
74
func (s *DataNode) checkCrc(p *repl.Packet) (err error) {
75
if !p.IsNormalWriteOperation() {
78
crc := crc32.ChecksumIEEE(p.Data[:p.Size])
80
return storage.CrcMismatchError
86
func (s *DataNode) checkPartition(p *repl.Packet) (err error) {
87
dp := s.space.Partition(p.PartitionID)
89
// err = proto.ErrDataPartitionNotExists
90
err = fmt.Errorf("data partition not exists %v", p.PartitionID)
94
if p.IsNormalWriteOperation() || p.IsCreateExtentOperation() {
95
if dp.Available() <= 0 {
96
err = storage.NoSpaceError
100
if p.IsNormalWriteOperation() || p.IsRandomWrite() {
101
dp.disk.allocCheckLimit(proto.FlowWriteType, uint32(p.Size))
102
dp.disk.allocCheckLimit(proto.IopsWriteType, 1)
107
func (s *DataNode) checkPacketAndPrepare(p *repl.Packet) error {
108
partition := p.Object.(*DataPartition)
109
store := p.Object.(*DataPartition).ExtentStore()
115
log.LogDebugf("action[prepare.checkPacketAndPrepare] pack opcode (%v) p.IsLeaderPacket(%v) p (%v)", p.Opcode, p.IsLeaderPacket(), p)
116
if p.IsRandomWrite() || p.IsSnapshotModWriteAppendOperation() || p.IsNormalWriteOperation() {
117
if err = partition.CheckWriteVer(p); err != nil {
121
if p.IsLeaderPacket() && proto.IsTinyExtentType(p.ExtentType) && p.IsNormalWriteOperation() {
122
extentID, err = store.GetAvailableTinyExtent()
124
return fmt.Errorf("checkPacketAndPrepare partition %v GetAvailableTinyExtent error %v", p.PartitionID, err.Error())
126
p.ExtentID = extentID
127
p.ExtentOffset, err = store.GetTinyExtentOffset(extentID)
129
return fmt.Errorf("checkPacketAndPrepare partition %v %v GetTinyExtentOffset error %v", p.PartitionID, extentID, err.Error())
131
} else if p.IsSnapshotModWriteAppendOperation() {
132
if proto.IsTinyExtentType(p.ExtentType) {
133
extentID, err = store.GetAvailableTinyExtent()
135
log.LogErrorf("err %v", err)
136
return fmt.Errorf("checkPacketAndPrepare partition %v GetAvailableTinyExtent error %v", p.PartitionID, err.Error())
138
p.ExtentID = extentID
139
p.ExtentOffset, err = store.GetTinyExtentOffset(p.ExtentID)
141
err = fmt.Errorf("checkPacketAndPrepare partition %v %v GetTinyExtentOffset error %v", p.PartitionID, extentID, err.Error())
142
log.LogErrorf("err %v", err)
144
log.LogDebugf("action[prepare.checkPacketAndPrepare] dp %v append randomWrite p.ExtentOffset %v Kernel(file)Offset %v",
145
p.PartitionID, p.ExtentOffset, p.KernelOffset)
149
p.ExtentOffset, err = store.GetExtentSnapshotModOffset(p.ExtentID, p.Size)
150
log.LogDebugf("action[prepare.checkPacketAndPrepare] pack (%v) partition %v %v", p, p.PartitionID, extentID)
152
return fmt.Errorf("checkPacketAndPrepare partition %v %v GetSnapshotModExtentOffset error %v", p.PartitionID, extentID, err.Error())
154
} else if p.IsLeaderPacket() && p.IsCreateExtentOperation() {
155
if partition.isNormalType() && partition.GetExtentCount() >= storage.MaxExtentCount*3 {
156
return fmt.Errorf("checkPacketAndPrepare partition %v has reached maxExtentId", p.PartitionID)
158
p.ExtentID, err = store.NextExtentID()
160
return fmt.Errorf("checkPacketAndPrepare partition %v allocCheckLimit NextExtentId error %v", p.PartitionID, err)
162
} else if p.IsLeaderPacket() &&
163
((p.IsMarkDeleteExtentOperation() && proto.IsTinyExtentType(p.ExtentType)) ||
164
(p.IsMarkSplitExtentOperation() && !proto.IsTinyExtentType(p.ExtentType))) {
166
log.LogDebugf("checkPacketAndPrepare. packet opCode %v p.ExtentType %v", p.Opcode, p.ExtentType)
168
record := new(proto.TinyExtentDeleteRecord)
169
if err := json.Unmarshal(p.Data[:p.Size], record); err != nil {
170
return fmt.Errorf("checkPacketAndPrepare failed %v", err.Error())
172
p.Data, _ = json.Marshal(record)
173
p.Size = uint32(len(p.Data))
176
if (p.IsCreateExtentOperation() || p.IsNormalWriteOperation()) && p.ExtentID == 0 {
177
return fmt.Errorf("checkPacketAndPrepare partition %v invalid extent id. ", p.PartitionID)