cubefs

Форк
0
/
encoder.go 
210 строк · 5.3 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package ec
16

17
import (
18
	"errors"
19
	"io"
20

21
	"github.com/klauspost/reedsolomon"
22

23
	"github.com/cubefs/cubefs/blobstore/common/codemode"
24
	"github.com/cubefs/cubefs/blobstore/util/limit"
25
	"github.com/cubefs/cubefs/blobstore/util/limit/count"
26
)
27

28
const (
29
	defaultConcurrency = 100
30
)
31

32
// errors
33
var (
34
	ErrShortData       = errors.New("short data")
35
	ErrInvalidCodeMode = errors.New("invalid code mode")
36
	ErrVerify          = errors.New("shards verify failed")
37
	ErrInvalidShards   = errors.New("invalid shards")
38
)
39

40
// Encoder normal ec encoder, implements all these functions
41
type Encoder interface {
42
	// encode source data into shards, whatever normal ec or LRC
43
	Encode(shards [][]byte) error
44
	// reconstruct all missing shards, you should assign the missing or bad idx in shards
45
	Reconstruct(shards [][]byte, badIdx []int) error
46
	// only reconstruct data shards, you should assign the missing or bad idx in shards
47
	ReconstructData(shards [][]byte, badIdx []int) error
48
	// split source data into adapted shards size
49
	Split(data []byte) ([][]byte, error)
50
	// get data shards(No-Copy)
51
	GetDataShards(shards [][]byte) [][]byte
52
	// get parity shards(No-Copy)
53
	GetParityShards(shards [][]byte) [][]byte
54
	// get local shards(LRC model, No-Copy)
55
	GetLocalShards(shards [][]byte) [][]byte
56
	// get shards in an idc
57
	GetShardsInIdc(shards [][]byte, idx int) [][]byte
58
	// output source data into dst(io.Writer)
59
	Join(dst io.Writer, shards [][]byte, outSize int) error
60
	// verify parity shards with data shards
61
	Verify(shards [][]byte) (bool, error)
62
}
63

64
// Config ec encoder config
65
type Config struct {
66
	CodeMode     codemode.Tactic
67
	EnableVerify bool
68
	Concurrency  int
69
}
70

71
type encoder struct {
72
	Config
73
	pool   limit.Limiter // concurrency pool
74
	engine reedsolomon.Encoder
75
}
76

77
// NewEncoder return an encoder which support normal EC or LRC
78
func NewEncoder(cfg Config) (Encoder, error) {
79
	if !cfg.CodeMode.IsValid() {
80
		return nil, ErrInvalidCodeMode
81
	}
82
	if cfg.Concurrency <= 0 {
83
		cfg.Concurrency = defaultConcurrency
84
	}
85

86
	engine, err := reedsolomon.New(cfg.CodeMode.N, cfg.CodeMode.M)
87
	if err != nil {
88
		return nil, err
89
	}
90
	pool := count.NewBlockingCount(cfg.Concurrency)
91

92
	if cfg.CodeMode.L != 0 {
93
		localN := (cfg.CodeMode.N + cfg.CodeMode.M) / cfg.CodeMode.AZCount
94
		localM := cfg.CodeMode.L / cfg.CodeMode.AZCount
95
		localEngine, err := reedsolomon.New(localN, localM)
96
		if err != nil {
97
			return nil, err
98
		}
99
		return &lrcEncoder{
100
			Config:      cfg,
101
			pool:        pool,
102
			engine:      engine,
103
			localEngine: localEngine,
104
		}, nil
105
	}
106

107
	return &encoder{
108
		Config: cfg,
109
		pool:   pool,
110
		engine: engine,
111
	}, nil
112
}
113

114
func (e *encoder) Encode(shards [][]byte) error {
115
	e.pool.Acquire()
116
	defer e.pool.Release()
117

118
	if err := e.engine.Encode(shards); err != nil {
119
		return err
120
	}
121
	if e.EnableVerify {
122
		ok, err := e.engine.Verify(shards)
123
		if err != nil {
124
			return err
125
		}
126
		if !ok {
127
			return ErrVerify
128
		}
129
	}
130
	return nil
131
}
132

133
func (e *encoder) Verify(shards [][]byte) (bool, error) {
134
	e.pool.Acquire()
135
	defer e.pool.Release()
136
	return e.engine.Verify(shards)
137
}
138

139
func (e *encoder) Reconstruct(shards [][]byte, badIdx []int) error {
140
	initBadShards(shards, badIdx)
141
	e.pool.Acquire()
142
	defer e.pool.Release()
143
	return e.engine.Reconstruct(shards)
144
}
145

146
func (e *encoder) ReconstructData(shards [][]byte, badIdx []int) error {
147
	initBadShards(shards, badIdx)
148
	e.pool.Acquire()
149
	defer e.pool.Release()
150
	return e.engine.ReconstructData(shards)
151
}
152

153
func (e *encoder) Split(data []byte) ([][]byte, error) {
154
	return e.engine.Split(data)
155
}
156

157
func (e *encoder) GetDataShards(shards [][]byte) [][]byte {
158
	return shards[:e.CodeMode.N]
159
}
160

161
func (e *encoder) GetParityShards(shards [][]byte) [][]byte {
162
	return shards[e.CodeMode.N:]
163
}
164

165
func (e *encoder) GetLocalShards(shards [][]byte) [][]byte {
166
	return nil
167
}
168

169
func (e *encoder) GetShardsInIdc(shards [][]byte, idx int) [][]byte {
170
	n, m := e.CodeMode.N, e.CodeMode.M
171
	idcCnt := e.CodeMode.AZCount
172

173
	localN, localM := n/idcCnt, m/idcCnt
174

175
	return append(shards[idx*localN:(idx+1)*localN], shards[n+localM*idx:n+localM*(idx+1)]...)
176
}
177

178
func (e *encoder) Join(dst io.Writer, shards [][]byte, outSize int) error {
179
	return e.engine.Join(dst, shards, outSize)
180
}
181

182
func initBadShards(shards [][]byte, badIdx []int) {
183
	for _, i := range badIdx {
184
		if shards[i] != nil && len(shards[i]) != 0 && cap(shards[i]) > 0 {
185
			shards[i] = shards[i][:0]
186
		}
187
	}
188
}
189

190
func shardSize(shards [][]byte) int {
191
	for _, shard := range shards {
192
		if len(shard) != 0 {
193
			return len(shard)
194
		}
195
	}
196
	return 0
197
}
198

199
func fillFullShards(shards [][]byte) {
200
	shardSize := shardSize(shards)
201
	for iShard := 0; iShard < len(shards); iShard++ {
202
		if len(shards[iShard]) == 0 {
203
			if cap(shards[iShard]) >= shardSize {
204
				shards[iShard] = shards[iShard][0:shardSize]
205
			} else {
206
				shards[iShard] = make([]byte, shardSize)
207
			}
208
		}
209
	}
210
}
211

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.