1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
21
"github.com/klauspost/reedsolomon"
23
"github.com/cubefs/cubefs/blobstore/util/errors"
24
"github.com/cubefs/cubefs/blobstore/util/limit"
25
"github.com/cubefs/cubefs/blobstore/util/task"
28
type lrcEncoder struct {
30
pool limit.Limiter // concurrency pool
31
engine reedsolomon.Encoder
32
localEngine reedsolomon.Encoder
35
func (e *lrcEncoder) Encode(shards [][]byte) error {
36
if len(shards) != (e.CodeMode.N + e.CodeMode.M + e.CodeMode.L) {
37
return ErrInvalidShards
40
defer e.pool.Release()
41
fillFullShards(shards)
43
// firstly, do global ec encode
44
if err := e.engine.Encode(shards[:e.CodeMode.N+e.CodeMode.M]); err != nil {
45
return errors.Info(err, "lrcEncoder.Encode global failed")
48
ok, err := e.engine.Verify(shards[:e.CodeMode.N+e.CodeMode.M])
50
return errors.Info(err, "lrcEncoder.Encode global verify failed")
57
tasks := make([]func() error, 0, e.CodeMode.AZCount)
58
// secondly, do local ec encode
59
for i := 0; i < e.CodeMode.AZCount; i++ {
60
localShards := e.GetShardsInIdc(shards, i)
61
tasks = append(tasks, func() error {
62
if err := e.localEngine.Encode(localShards); err != nil {
63
return errors.Info(err, "lrcEncoder.Encode local failed")
66
ok, err := e.localEngine.Verify(localShards)
68
return errors.Info(err, "lrcEncoder.Encode local verify failed")
77
if err := task.Run(context.Background(), tasks...); err != nil {
84
type verifyError struct {
89
func (e *lrcEncoder) Verify(shards [][]byte) (bool, error) {
91
defer e.pool.Release()
93
if len(shards) == (e.CodeMode.N+e.CodeMode.M+e.CodeMode.L)/e.CodeMode.AZCount {
94
ok, err := e.localEngine.Verify(shards)
96
err = errors.Info(err, "lrcEncoder.Verify local shards failed")
101
ok, err := e.engine.Verify(shards[:e.CodeMode.N+e.CodeMode.M])
102
if !ok || err != nil {
104
err = errors.Info(err, "lrcEncoder.Verify global shards failed")
109
tasks := make([]func() error, 0, e.CodeMode.AZCount)
110
for i := 0; i < e.CodeMode.AZCount; i++ {
111
localShards := e.GetShardsInIdc(shards, i)
112
tasks = append(tasks, func() error {
113
ok, err := e.localEngine.Verify(localShards)
114
if !ok || err != nil {
116
err = errors.Info(err, "lrcEncoder.Verify local shards failed")
118
return verifyError{error: err, verified: ok}
123
if err := task.Run(context.Background(), tasks...); err != nil {
124
if verifyErr, succ := err.(verifyError); succ {
125
return verifyErr.verified, verifyErr.error
133
func (e *lrcEncoder) Reconstruct(shards [][]byte, badIdx []int) error {
134
fillFullShards(shards)
136
globalBadIdx := make([]int, 0)
137
for _, i := range badIdx {
138
if i < e.CodeMode.N+e.CodeMode.M {
139
globalBadIdx = append(globalBadIdx, i)
142
initBadShards(shards, globalBadIdx)
144
defer e.pool.Release()
146
// use local ec reconstruct, saving network bandwidth
147
if len(shards) == (e.CodeMode.N+e.CodeMode.M+e.CodeMode.L)/e.CodeMode.AZCount {
148
if err := e.localEngine.Reconstruct(shards); err != nil {
149
return errors.Info(err, "lrcEncoder.Reconstruct local ec reconstruct failed")
154
// can't reconstruct from local ec
155
// firstly, use global ec reconstruct
156
if err := e.engine.Reconstruct(shards[:e.CodeMode.N+e.CodeMode.M]); err != nil {
157
return errors.Info(err, "lrcEncoder.Reconstruct global ec reconstruct failed")
160
// secondly, check if need to reconstruct the local shards
161
localRestructs := make(map[int][]int)
162
n, m, l, azCount := e.CodeMode.N, e.CodeMode.M, e.CodeMode.L, e.CodeMode.AZCount
163
for _, i := range badIdx {
165
idcIdx := (i - n - m) * azCount / l
166
localBadIdx := i - n - m - l/azCount*idcIdx + (n+m)/azCount
167
if _, ok := localRestructs[idcIdx]; !ok {
168
localRestructs[idcIdx] = make([]int, 0)
170
localRestructs[idcIdx] = append(localRestructs[idcIdx], localBadIdx)
174
tasks := make([]func() error, 0, len(localRestructs))
175
for idx, badIdx := range localRestructs {
176
localShards := e.GetShardsInIdc(shards, idx)
177
initBadShards(localShards, badIdx)
178
tasks = append(tasks, func() error {
179
return e.localEngine.Reconstruct(localShards)
182
if err := task.Run(context.Background(), tasks...); err != nil {
183
return errors.Info(err, "lrcEncoder.Reconstruct local ec reconstruct after global ec failed")
188
func (e *lrcEncoder) ReconstructData(shards [][]byte, badIdx []int) error {
189
fillFullShards(shards[:e.CodeMode.N+e.CodeMode.M])
190
globalBadIdx := make([]int, 0)
191
for _, i := range badIdx {
192
if i < e.CodeMode.N+e.CodeMode.M {
193
globalBadIdx = append(globalBadIdx, i)
196
initBadShards(shards, globalBadIdx)
197
shards = shards[:e.CodeMode.N+e.CodeMode.M]
199
defer e.pool.Release()
200
return e.engine.ReconstructData(shards)
203
func (e *lrcEncoder) Split(data []byte) ([][]byte, error) {
204
shards, err := e.engine.Split(data)
208
shardN, shardLen := len(shards), len(shards[0])
209
if cap(data) >= (e.CodeMode.L+shardN)*shardLen {
210
if cap(data) > len(data) {
211
data = data[:cap(data)]
213
for i := 0; i < e.CodeMode.L; i++ {
214
shards = append(shards, data[(shardN+i)*shardLen:(shardN+i+1)*shardLen])
217
for i := 0; i < e.CodeMode.L; i++ {
218
shards = append(shards, make([]byte, shardLen))
224
func (e *lrcEncoder) GetDataShards(shards [][]byte) [][]byte {
225
return shards[:e.CodeMode.N]
228
func (e *lrcEncoder) GetParityShards(shards [][]byte) [][]byte {
229
return shards[e.CodeMode.N : e.CodeMode.N+e.CodeMode.M]
232
func (e *lrcEncoder) GetLocalShards(shards [][]byte) [][]byte {
233
return shards[e.CodeMode.N+e.CodeMode.M:]
236
func (e *lrcEncoder) GetShardsInIdc(shards [][]byte, idx int) [][]byte {
237
locals, _, _ := e.CodeMode.LocalStripeInAZ(idx)
238
localShards := make([][]byte, len(locals))
239
for localIdx, globalIdx := range locals {
240
localShards[localIdx] = shards[globalIdx]
245
func (e *lrcEncoder) Join(dst io.Writer, shards [][]byte, outSize int) error {
246
return e.engine.Join(dst, shards[:(e.CodeMode.N+e.CodeMode.M)], outSize)