cubefs

Форк
0
/
wrap_prepare.go 
183 строки · 5.8 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package datanode
16

17
import (
18
	"encoding/json"
19
	"fmt"
20
	"hash/crc32"
21
	"sync/atomic"
22

23
	"github.com/cubefs/cubefs/proto"
24
	"github.com/cubefs/cubefs/repl"
25
	"github.com/cubefs/cubefs/storage"
26
	"github.com/cubefs/cubefs/util/log"
27
)
28

29
func (s *DataNode) Prepare(p *repl.Packet) (err error) {
30
	defer func() {
31
		p.SetPacketHasPrepare()
32
		if err != nil {
33
			p.PackErrorBody(repl.ActionPreparePkt, err.Error())
34
		} else {
35
			p.AfterPre = true
36
		}
37
	}()
38
	if p.IsMasterCommand() {
39
		return
40
	}
41
	atomic.AddUint64(&s.metricsCnt, 1)
42
	if !s.shallDegrade() {
43
		p.BeforeTp(s.clusterID)
44
		p.UnsetDegrade()
45
	} else {
46
		p.SetDegrade()
47
	}
48
	err = s.checkStoreMode(p)
49
	if err != nil {
50
		return
51
	}
52
	if err = s.checkCrc(p); err != nil {
53
		return
54
	}
55
	if err = s.checkPartition(p); err != nil {
56
		return
57
	}
58
	// For certain packet, we meed to add some additional extent information.
59
	if err = s.checkPacketAndPrepare(p); err != nil {
60
		return
61
	}
62

63
	return
64
}
65

66
func (s *DataNode) checkStoreMode(p *repl.Packet) (err error) {
67
	if proto.IsTinyExtentType(p.ExtentType) || proto.IsNormalExtentType(p.ExtentType) {
68
		return
69
	}
70
	log.LogErrorf("action[checkStoreMode] dp [%v] reqId [%v] extent type %v", p.PartitionID, p.ReqID, p.ExtentType)
71
	return ErrIncorrectStoreType
72
}
73

74
func (s *DataNode) checkCrc(p *repl.Packet) (err error) {
75
	if !p.IsNormalWriteOperation() {
76
		return
77
	}
78
	crc := crc32.ChecksumIEEE(p.Data[:p.Size])
79
	if crc != p.CRC {
80
		return storage.CrcMismatchError
81
	}
82

83
	return
84
}
85

86
func (s *DataNode) checkPartition(p *repl.Packet) (err error) {
87
	dp := s.space.Partition(p.PartitionID)
88
	if dp == nil {
89
		// err = proto.ErrDataPartitionNotExists
90
		err = fmt.Errorf("data partition not exists %v", p.PartitionID)
91
		return
92
	}
93
	p.Object = dp
94
	if p.IsNormalWriteOperation() || p.IsCreateExtentOperation() {
95
		if dp.Available() <= 0 {
96
			err = storage.NoSpaceError
97
			return
98
		}
99
	}
100
	if p.IsNormalWriteOperation() || p.IsRandomWrite() {
101
		dp.disk.allocCheckLimit(proto.FlowWriteType, uint32(p.Size))
102
		dp.disk.allocCheckLimit(proto.IopsWriteType, 1)
103
	}
104
	return
105
}
106

107
func (s *DataNode) checkPacketAndPrepare(p *repl.Packet) error {
108
	partition := p.Object.(*DataPartition)
109
	store := p.Object.(*DataPartition).ExtentStore()
110
	var (
111
		extentID uint64
112
		err      error
113
	)
114

115
	log.LogDebugf("action[prepare.checkPacketAndPrepare] pack opcode (%v) p.IsLeaderPacket(%v) p (%v)", p.Opcode, p.IsLeaderPacket(), p)
116
	if p.IsRandomWrite() || p.IsSnapshotModWriteAppendOperation() || p.IsNormalWriteOperation() {
117
		if err = partition.CheckWriteVer(p); err != nil {
118
			return err
119
		}
120
	}
121
	if p.IsLeaderPacket() && proto.IsTinyExtentType(p.ExtentType) && p.IsNormalWriteOperation() {
122
		extentID, err = store.GetAvailableTinyExtent()
123
		if err != nil {
124
			return fmt.Errorf("checkPacketAndPrepare partition %v GetAvailableTinyExtent error %v", p.PartitionID, err.Error())
125
		}
126
		p.ExtentID = extentID
127
		p.ExtentOffset, err = store.GetTinyExtentOffset(extentID)
128
		if err != nil {
129
			return fmt.Errorf("checkPacketAndPrepare partition %v  %v GetTinyExtentOffset error %v", p.PartitionID, extentID, err.Error())
130
		}
131
	} else if p.IsSnapshotModWriteAppendOperation() {
132
		if proto.IsTinyExtentType(p.ExtentType) {
133
			extentID, err = store.GetAvailableTinyExtent()
134
			if err != nil {
135
				log.LogErrorf("err %v", err)
136
				return fmt.Errorf("checkPacketAndPrepare partition %v GetAvailableTinyExtent error %v", p.PartitionID, err.Error())
137
			}
138
			p.ExtentID = extentID
139
			p.ExtentOffset, err = store.GetTinyExtentOffset(p.ExtentID)
140
			if err != nil {
141
				err = fmt.Errorf("checkPacketAndPrepare partition %v  %v GetTinyExtentOffset error %v", p.PartitionID, extentID, err.Error())
142
				log.LogErrorf("err %v", err)
143
			}
144
			log.LogDebugf("action[prepare.checkPacketAndPrepare] dp %v append randomWrite p.ExtentOffset %v Kernel(file)Offset %v",
145
				p.PartitionID, p.ExtentOffset, p.KernelOffset)
146
			return err
147
		}
148

149
		p.ExtentOffset, err = store.GetExtentSnapshotModOffset(p.ExtentID, p.Size)
150
		log.LogDebugf("action[prepare.checkPacketAndPrepare] pack (%v) partition %v %v", p, p.PartitionID, extentID)
151
		if err != nil {
152
			return fmt.Errorf("checkPacketAndPrepare partition %v  %v GetSnapshotModExtentOffset error %v", p.PartitionID, extentID, err.Error())
153
		}
154
	} else if p.IsLeaderPacket() && p.IsCreateExtentOperation() {
155
		if partition.isNormalType() && partition.GetExtentCount() >= storage.MaxExtentCount*3 {
156
			return fmt.Errorf("checkPacketAndPrepare partition %v has reached maxExtentId", p.PartitionID)
157
		}
158
		p.ExtentID, err = store.NextExtentID()
159
		if err != nil {
160
			return fmt.Errorf("checkPacketAndPrepare partition %v allocCheckLimit NextExtentId error %v", p.PartitionID, err)
161
		}
162
	} else if p.IsLeaderPacket() &&
163
		((p.IsMarkDeleteExtentOperation() && proto.IsTinyExtentType(p.ExtentType)) ||
164
			(p.IsMarkSplitExtentOperation() && !proto.IsTinyExtentType(p.ExtentType))) {
165

166
		log.LogDebugf("checkPacketAndPrepare. packet opCode %v p.ExtentType %v", p.Opcode, p.ExtentType)
167

168
		record := new(proto.TinyExtentDeleteRecord)
169
		if err := json.Unmarshal(p.Data[:p.Size], record); err != nil {
170
			return fmt.Errorf("checkPacketAndPrepare failed %v", err.Error())
171
		}
172
		p.Data, _ = json.Marshal(record)
173
		p.Size = uint32(len(p.Data))
174
	}
175

176
	if (p.IsCreateExtentOperation() || p.IsNormalWriteOperation()) && p.ExtentID == 0 {
177
		return fmt.Errorf("checkPacketAndPrepare partition %v invalid extent id. ", p.PartitionID)
178
	}
179

180
	p.OrgBuffer = p.Data
181

182
	return nil
183
}
184

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.