cubefs

Форк
0
/
lrcencoder.go 
247 строк · 6.9 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package ec
16

17
import (
18
	"context"
19
	"io"
20

21
	"github.com/klauspost/reedsolomon"
22

23
	"github.com/cubefs/cubefs/blobstore/util/errors"
24
	"github.com/cubefs/cubefs/blobstore/util/limit"
25
	"github.com/cubefs/cubefs/blobstore/util/task"
26
)
27

28
type lrcEncoder struct {
29
	Config
30
	pool        limit.Limiter // concurrency pool
31
	engine      reedsolomon.Encoder
32
	localEngine reedsolomon.Encoder
33
}
34

35
func (e *lrcEncoder) Encode(shards [][]byte) error {
36
	if len(shards) != (e.CodeMode.N + e.CodeMode.M + e.CodeMode.L) {
37
		return ErrInvalidShards
38
	}
39
	e.pool.Acquire()
40
	defer e.pool.Release()
41
	fillFullShards(shards)
42

43
	// firstly, do global ec encode
44
	if err := e.engine.Encode(shards[:e.CodeMode.N+e.CodeMode.M]); err != nil {
45
		return errors.Info(err, "lrcEncoder.Encode global failed")
46
	}
47
	if e.EnableVerify {
48
		ok, err := e.engine.Verify(shards[:e.CodeMode.N+e.CodeMode.M])
49
		if err != nil {
50
			return errors.Info(err, "lrcEncoder.Encode global verify failed")
51
		}
52
		if !ok {
53
			return ErrVerify
54
		}
55
	}
56

57
	tasks := make([]func() error, 0, e.CodeMode.AZCount)
58
	// secondly, do local ec encode
59
	for i := 0; i < e.CodeMode.AZCount; i++ {
60
		localShards := e.GetShardsInIdc(shards, i)
61
		tasks = append(tasks, func() error {
62
			if err := e.localEngine.Encode(localShards); err != nil {
63
				return errors.Info(err, "lrcEncoder.Encode local failed")
64
			}
65
			if e.EnableVerify {
66
				ok, err := e.localEngine.Verify(localShards)
67
				if err != nil {
68
					return errors.Info(err, "lrcEncoder.Encode local verify failed")
69
				}
70
				if !ok {
71
					return ErrVerify
72
				}
73
			}
74
			return nil
75
		})
76
	}
77
	if err := task.Run(context.Background(), tasks...); err != nil {
78
		return err
79
	}
80

81
	return nil
82
}
83

84
type verifyError struct {
85
	error
86
	verified bool
87
}
88

89
func (e *lrcEncoder) Verify(shards [][]byte) (bool, error) {
90
	e.pool.Acquire()
91
	defer e.pool.Release()
92

93
	if len(shards) == (e.CodeMode.N+e.CodeMode.M+e.CodeMode.L)/e.CodeMode.AZCount {
94
		ok, err := e.localEngine.Verify(shards)
95
		if err != nil {
96
			err = errors.Info(err, "lrcEncoder.Verify local shards failed")
97
		}
98
		return ok, err
99
	}
100

101
	ok, err := e.engine.Verify(shards[:e.CodeMode.N+e.CodeMode.M])
102
	if !ok || err != nil {
103
		if err != nil {
104
			err = errors.Info(err, "lrcEncoder.Verify global shards failed")
105
		}
106
		return ok, err
107
	}
108

109
	tasks := make([]func() error, 0, e.CodeMode.AZCount)
110
	for i := 0; i < e.CodeMode.AZCount; i++ {
111
		localShards := e.GetShardsInIdc(shards, i)
112
		tasks = append(tasks, func() error {
113
			ok, err := e.localEngine.Verify(localShards)
114
			if !ok || err != nil {
115
				if err != nil {
116
					err = errors.Info(err, "lrcEncoder.Verify local shards failed")
117
				}
118
				return verifyError{error: err, verified: ok}
119
			}
120
			return nil
121
		})
122
	}
123
	if err := task.Run(context.Background(), tasks...); err != nil {
124
		if verifyErr, succ := err.(verifyError); succ {
125
			return verifyErr.verified, verifyErr.error
126
		}
127
		return false, err
128
	}
129

130
	return true, nil
131
}
132

133
func (e *lrcEncoder) Reconstruct(shards [][]byte, badIdx []int) error {
134
	fillFullShards(shards)
135

136
	globalBadIdx := make([]int, 0)
137
	for _, i := range badIdx {
138
		if i < e.CodeMode.N+e.CodeMode.M {
139
			globalBadIdx = append(globalBadIdx, i)
140
		}
141
	}
142
	initBadShards(shards, globalBadIdx)
143
	e.pool.Acquire()
144
	defer e.pool.Release()
145

146
	// use local ec reconstruct, saving network bandwidth
147
	if len(shards) == (e.CodeMode.N+e.CodeMode.M+e.CodeMode.L)/e.CodeMode.AZCount {
148
		if err := e.localEngine.Reconstruct(shards); err != nil {
149
			return errors.Info(err, "lrcEncoder.Reconstruct local ec reconstruct failed")
150
		}
151
		return nil
152
	}
153

154
	// can't reconstruct from local ec
155
	// firstly, use global ec reconstruct
156
	if err := e.engine.Reconstruct(shards[:e.CodeMode.N+e.CodeMode.M]); err != nil {
157
		return errors.Info(err, "lrcEncoder.Reconstruct global ec reconstruct failed")
158
	}
159

160
	// secondly, check if need to reconstruct the local shards
161
	localRestructs := make(map[int][]int)
162
	n, m, l, azCount := e.CodeMode.N, e.CodeMode.M, e.CodeMode.L, e.CodeMode.AZCount
163
	for _, i := range badIdx {
164
		if i >= (n + m) {
165
			idcIdx := (i - n - m) * azCount / l
166
			localBadIdx := i - n - m - l/azCount*idcIdx + (n+m)/azCount
167
			if _, ok := localRestructs[idcIdx]; !ok {
168
				localRestructs[idcIdx] = make([]int, 0)
169
			}
170
			localRestructs[idcIdx] = append(localRestructs[idcIdx], localBadIdx)
171
		}
172
	}
173

174
	tasks := make([]func() error, 0, len(localRestructs))
175
	for idx, badIdx := range localRestructs {
176
		localShards := e.GetShardsInIdc(shards, idx)
177
		initBadShards(localShards, badIdx)
178
		tasks = append(tasks, func() error {
179
			return e.localEngine.Reconstruct(localShards)
180
		})
181
	}
182
	if err := task.Run(context.Background(), tasks...); err != nil {
183
		return errors.Info(err, "lrcEncoder.Reconstruct local ec reconstruct after global ec failed")
184
	}
185
	return nil
186
}
187

188
func (e *lrcEncoder) ReconstructData(shards [][]byte, badIdx []int) error {
189
	fillFullShards(shards[:e.CodeMode.N+e.CodeMode.M])
190
	globalBadIdx := make([]int, 0)
191
	for _, i := range badIdx {
192
		if i < e.CodeMode.N+e.CodeMode.M {
193
			globalBadIdx = append(globalBadIdx, i)
194
		}
195
	}
196
	initBadShards(shards, globalBadIdx)
197
	shards = shards[:e.CodeMode.N+e.CodeMode.M]
198
	e.pool.Acquire()
199
	defer e.pool.Release()
200
	return e.engine.ReconstructData(shards)
201
}
202

203
func (e *lrcEncoder) Split(data []byte) ([][]byte, error) {
204
	shards, err := e.engine.Split(data)
205
	if err != nil {
206
		return nil, err
207
	}
208
	shardN, shardLen := len(shards), len(shards[0])
209
	if cap(data) >= (e.CodeMode.L+shardN)*shardLen {
210
		if cap(data) > len(data) {
211
			data = data[:cap(data)]
212
		}
213
		for i := 0; i < e.CodeMode.L; i++ {
214
			shards = append(shards, data[(shardN+i)*shardLen:(shardN+i+1)*shardLen])
215
		}
216
	} else {
217
		for i := 0; i < e.CodeMode.L; i++ {
218
			shards = append(shards, make([]byte, shardLen))
219
		}
220
	}
221
	return shards, nil
222
}
223

224
func (e *lrcEncoder) GetDataShards(shards [][]byte) [][]byte {
225
	return shards[:e.CodeMode.N]
226
}
227

228
func (e *lrcEncoder) GetParityShards(shards [][]byte) [][]byte {
229
	return shards[e.CodeMode.N : e.CodeMode.N+e.CodeMode.M]
230
}
231

232
func (e *lrcEncoder) GetLocalShards(shards [][]byte) [][]byte {
233
	return shards[e.CodeMode.N+e.CodeMode.M:]
234
}
235

236
func (e *lrcEncoder) GetShardsInIdc(shards [][]byte, idx int) [][]byte {
237
	locals, _, _ := e.CodeMode.LocalStripeInAZ(idx)
238
	localShards := make([][]byte, len(locals))
239
	for localIdx, globalIdx := range locals {
240
		localShards[localIdx] = shards[globalIdx]
241
	}
242
	return localShards
243
}
244

245
func (e *lrcEncoder) Join(dst io.Writer, shards [][]byte, outSize int) error {
246
	return e.engine.Join(dst, shards[:(e.CodeMode.N+e.CodeMode.M)], outSize)
247
}
248

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.