1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
21
"github.com/klauspost/reedsolomon"
23
"github.com/cubefs/cubefs/blobstore/common/codemode"
24
"github.com/cubefs/cubefs/blobstore/util/limit"
25
"github.com/cubefs/cubefs/blobstore/util/limit/count"
29
defaultConcurrency = 100
34
ErrShortData = errors.New("short data")
35
ErrInvalidCodeMode = errors.New("invalid code mode")
36
ErrVerify = errors.New("shards verify failed")
37
ErrInvalidShards = errors.New("invalid shards")
40
// Encoder normal ec encoder, implements all these functions
41
type Encoder interface {
42
// encode source data into shards, whatever normal ec or LRC
43
Encode(shards [][]byte) error
44
// reconstruct all missing shards, you should assign the missing or bad idx in shards
45
Reconstruct(shards [][]byte, badIdx []int) error
46
// only reconstruct data shards, you should assign the missing or bad idx in shards
47
ReconstructData(shards [][]byte, badIdx []int) error
48
// split source data into adapted shards size
49
Split(data []byte) ([][]byte, error)
50
// get data shards(No-Copy)
51
GetDataShards(shards [][]byte) [][]byte
52
// get parity shards(No-Copy)
53
GetParityShards(shards [][]byte) [][]byte
54
// get local shards(LRC model, No-Copy)
55
GetLocalShards(shards [][]byte) [][]byte
56
// get shards in an idc
57
GetShardsInIdc(shards [][]byte, idx int) [][]byte
58
// output source data into dst(io.Writer)
59
Join(dst io.Writer, shards [][]byte, outSize int) error
60
// verify parity shards with data shards
61
Verify(shards [][]byte) (bool, error)
64
// Config ec encoder config
66
CodeMode codemode.Tactic
73
pool limit.Limiter // concurrency pool
74
engine reedsolomon.Encoder
77
// NewEncoder return an encoder which support normal EC or LRC
78
func NewEncoder(cfg Config) (Encoder, error) {
79
if !cfg.CodeMode.IsValid() {
80
return nil, ErrInvalidCodeMode
82
if cfg.Concurrency <= 0 {
83
cfg.Concurrency = defaultConcurrency
86
engine, err := reedsolomon.New(cfg.CodeMode.N, cfg.CodeMode.M)
90
pool := count.NewBlockingCount(cfg.Concurrency)
92
if cfg.CodeMode.L != 0 {
93
localN := (cfg.CodeMode.N + cfg.CodeMode.M) / cfg.CodeMode.AZCount
94
localM := cfg.CodeMode.L / cfg.CodeMode.AZCount
95
localEngine, err := reedsolomon.New(localN, localM)
103
localEngine: localEngine,
114
func (e *encoder) Encode(shards [][]byte) error {
116
defer e.pool.Release()
118
if err := e.engine.Encode(shards); err != nil {
122
ok, err := e.engine.Verify(shards)
133
func (e *encoder) Verify(shards [][]byte) (bool, error) {
135
defer e.pool.Release()
136
return e.engine.Verify(shards)
139
func (e *encoder) Reconstruct(shards [][]byte, badIdx []int) error {
140
initBadShards(shards, badIdx)
142
defer e.pool.Release()
143
return e.engine.Reconstruct(shards)
146
func (e *encoder) ReconstructData(shards [][]byte, badIdx []int) error {
147
initBadShards(shards, badIdx)
149
defer e.pool.Release()
150
return e.engine.ReconstructData(shards)
153
func (e *encoder) Split(data []byte) ([][]byte, error) {
154
return e.engine.Split(data)
157
func (e *encoder) GetDataShards(shards [][]byte) [][]byte {
158
return shards[:e.CodeMode.N]
161
func (e *encoder) GetParityShards(shards [][]byte) [][]byte {
162
return shards[e.CodeMode.N:]
165
func (e *encoder) GetLocalShards(shards [][]byte) [][]byte {
169
func (e *encoder) GetShardsInIdc(shards [][]byte, idx int) [][]byte {
170
n, m := e.CodeMode.N, e.CodeMode.M
171
idcCnt := e.CodeMode.AZCount
173
localN, localM := n/idcCnt, m/idcCnt
175
return append(shards[idx*localN:(idx+1)*localN], shards[n+localM*idx:n+localM*(idx+1)]...)
178
func (e *encoder) Join(dst io.Writer, shards [][]byte, outSize int) error {
179
return e.engine.Join(dst, shards, outSize)
182
func initBadShards(shards [][]byte, badIdx []int) {
183
for _, i := range badIdx {
184
if shards[i] != nil && len(shards[i]) != 0 && cap(shards[i]) > 0 {
185
shards[i] = shards[i][:0]
190
func shardSize(shards [][]byte) int {
191
for _, shard := range shards {
199
func fillFullShards(shards [][]byte) {
200
shardSize := shardSize(shards)
201
for iShard := 0; iShard < len(shards); iShard++ {
202
if len(shards[iShard]) == 0 {
203
if cap(shards[iShard]) >= shardSize {
204
shards[iShard] = shards[iShard][0:shardSize]
206
shards[iShard] = make([]byte, shardSize)