25
"github.com/cubefs/cubefs/blobstore/access/controller"
26
"github.com/cubefs/cubefs/blobstore/api/access"
27
"github.com/cubefs/cubefs/blobstore/api/clustermgr"
28
"github.com/cubefs/cubefs/blobstore/cmd"
29
"github.com/cubefs/cubefs/blobstore/common/consul"
30
errcode "github.com/cubefs/cubefs/blobstore/common/errors"
31
"github.com/cubefs/cubefs/blobstore/common/profile"
32
"github.com/cubefs/cubefs/blobstore/common/proto"
33
"github.com/cubefs/cubefs/blobstore/common/resourcepool"
34
"github.com/cubefs/cubefs/blobstore/common/rpc"
35
"github.com/cubefs/cubefs/blobstore/common/trace"
36
"github.com/cubefs/cubefs/blobstore/common/uptoken"
37
"github.com/cubefs/cubefs/blobstore/util/closer"
38
"github.com/cubefs/cubefs/blobstore/util/errors"
39
"github.com/cubefs/cubefs/blobstore/util/log"
43
limitNameAlloc = "alloc"
45
limitNamePutAt = "putat"
47
limitNameDelete = "delete"
48
limitNameSign = "sign"
52
_tokenExpiration = time.Hour * 12
61
tokenSecretKeys = [...][20]byte{
62
{0x5f, 0x00, 0x88, 0x96, 0x00, 0xa1, 0xfe, 0x1b},
63
{0xff, 0x1f, 0x2f, 0x4f, 0x7f, 0xaf, 0xef, 0xff},
65
_initTokenSecret sync.Once
68
func initTokenSecret(b []byte) {
69
_initTokenSecret.Do(func() {
70
for idx := range tokenSecretKeys {
71
copy(tokenSecretKeys[idx][7:], b)
76
func initWithRegionMagic(regionMagic string) {
77
if regionMagic == "" {
78
log.Warn("no region magic setting, using default secret keys for checksum")
81
b := sha1.Sum([]byte(regionMagic))
82
initTokenSecret(b[:8])
83
initLocationSecret(b[:8])
86
type accessStatus struct {
87
Limit Status `json:"limit"`
88
Pool resourcepool.Status `json:"pool"`
90
Config StreamConfig `json:"config"`
91
Clusters []*clustermgr.ClusterInfo `json:"clusters"`
92
Services map[proto.ClusterID]map[string][]string `json:"services"`
99
ServiceRegister consul.Config `json:"service_register"`
100
Stream StreamConfig `json:"stream"`
101
Limit LimitConfig `json:"limit"`
107
streamHandler StreamHandler
113
func New(cfg Config) *Service {
115
initWithRegionMagic(cfg.Stream.ClusterConfig.RegionMagic)
120
streamHandler: NewStreamHandler(&cfg.Stream, cl.Done()),
121
limiter: NewLimiter(cfg.Limit),
127
func (s *Service) Close() {
132
func (s *Service) RegisterService() {
133
if s.config.ServiceRegister.ConsulAddr == "" {
136
_, err := consul.ServiceRegister(s.config.BindAddr, &s.config.ServiceRegister)
138
log.Fatalf("service register failed, err: %v", err)
143
func (s *Service) RegisterAdminHandler() {
144
profile.HandleFunc(http.MethodGet, "/access/status", func(c *rpc.Context) {
145
var admin *streamAdmin
146
if sa := s.streamHandler.Admin(); sa != nil {
147
if ad, ok := sa.(*streamAdmin); ok {
152
c.RespondStatus(http.StatusServiceUnavailable)
156
ctx := c.Request.Context()
157
span := trace.SpanFromContextSafe(ctx)
159
status := new(accessStatus)
160
status.Limit = s.limiter.Status()
161
status.Pool = admin.memPool.Status()
162
status.Config = admin.config
163
status.Clusters = admin.controller.All()
164
status.Services = make(map[proto.ClusterID]map[string][]string, len(status.Clusters))
166
for _, cluster := range status.Clusters {
167
service, err := admin.controller.GetServiceController(cluster.ClusterID)
169
span.Warn(err.Error())
173
svrs := make(map[string][]string, 1)
174
svrName := proto.ServiceNameProxy
175
if hosts, err := service.GetServiceHosts(ctx, svrName); err == nil {
176
svrs[svrName] = hosts
178
span.Warn(err.Error())
180
status.Services[cluster.ClusterID] = svrs
182
c.RespondJSON(status)
185
profile.HandleFunc(http.MethodPost, "/access/stream/controller/alg/:alg", func(c *rpc.Context) {
186
algInt, err := strconv.ParseUint(c.Param.ByName("alg"), 10, 32)
188
c.RespondWith(http.StatusBadRequest, "", []byte(err.Error()))
192
alg := controller.AlgChoose(algInt)
193
if sa := s.streamHandler.Admin(); sa != nil {
194
if admin, ok := sa.(*streamAdmin); ok {
195
if err := admin.controller.ChangeChooseAlg(alg); err != nil {
196
c.RespondWith(http.StatusForbidden, "", []byte(err.Error()))
200
span := trace.SpanFromContextSafe(c.Request.Context())
201
span.Warnf("change cluster choose algorithm to (%d %s)", alg, alg.String())
207
c.RespondStatus(http.StatusServiceUnavailable)
212
func (s *Service) Limit(c *rpc.Context) {
214
switch c.Request.URL.Path {
216
name = limitNameAlloc
220
name = limitNamePutAt
224
name = limitNameDelete
233
if err := s.limiter.Acquire(name); err != nil {
234
span := trace.SpanFromContextSafe(c.Request.Context())
235
span.Info("access concurrent limited", name, err)
236
c.AbortWithError(errcode.ErrAccessLimited)
239
defer s.limiter.Release(name)
244
func (s *Service) Put(c *rpc.Context) {
245
args := new(access.PutArgs)
246
if err := c.ParseArgs(args); err != nil {
251
ctx := c.Request.Context()
252
span := trace.SpanFromContextSafe(ctx)
254
span.Debugf("accept /put request args:%+v", args)
256
c.RespondError(errcode.ErrIllegalArguments)
260
hashSumMap := args.Hashes.ToHashSumMap()
261
hasherMap := make(access.HasherMap, len(hashSumMap))
263
for alg := range hashSumMap {
264
hasherMap[alg] = alg.ToHasher()
267
rc := s.limiter.Reader(ctx, c.Request.Body)
268
loc, err := s.streamHandler.Put(ctx, rc, args.Size, hasherMap)
270
span.Error("stream put failed", errors.Detail(err))
271
c.RespondError(httpError(err))
276
for alg, hasher := range hasherMap {
277
hashSumMap[alg] = hasher.Sum(nil)
280
if err := fillCrc(loc); err != nil {
281
span.Error("stream put fill location crc", err)
282
c.RespondError(httpError(err))
286
c.RespondJSON(access.PutResp{
288
HashSumMap: hashSumMap,
290
span.Infof("done /put request location:%+v hash:%+v", loc, hashSumMap.All())
294
func (s *Service) PutAt(c *rpc.Context) {
295
args := new(access.PutAtArgs)
296
if err := c.ParseArgs(args); err != nil {
301
ctx := c.Request.Context()
302
span := trace.SpanFromContextSafe(ctx)
304
span.Debugf("accept /putat request args:%+v", args)
306
c.RespondError(errcode.ErrIllegalArguments)
311
for _, secretKey := range tokenSecretKeys {
312
token := uptoken.DecodeToken(args.Token)
313
if token.IsValid(args.ClusterID, args.Vid, args.BlobID, uint32(args.Size), secretKey[:]) {
319
span.Debugf("invalid token:%s", args.Token)
320
c.RespondError(errcode.ErrIllegalArguments)
324
hashSumMap := args.Hashes.ToHashSumMap()
325
hasherMap := make(access.HasherMap, len(hashSumMap))
327
for alg := range hashSumMap {
328
hasherMap[alg] = alg.ToHasher()
331
rc := s.limiter.Reader(ctx, c.Request.Body)
332
err := s.streamHandler.PutAt(ctx, rc, args.ClusterID, args.Vid, args.BlobID, args.Size, hasherMap)
334
span.Error("stream putat failed", errors.Detail(err))
335
c.RespondError(httpError(err))
340
for alg, hasher := range hasherMap {
341
hashSumMap[alg] = hasher.Sum(nil)
344
c.RespondJSON(access.PutAtResp{HashSumMap: hashSumMap})
345
span.Infof("done /putat request hash:%+v", hashSumMap.All())
349
func (s *Service) Alloc(c *rpc.Context) {
350
args := new(access.AllocArgs)
351
if err := c.ParseArgs(args); err != nil {
356
ctx := c.Request.Context()
357
span := trace.SpanFromContextSafe(ctx)
359
span.Debugf("accept /alloc request args:%+v", args)
361
c.RespondError(errcode.ErrIllegalArguments)
365
location, err := s.streamHandler.Alloc(ctx, args.Size, args.BlobSize, args.AssignClusterID, args.CodeMode)
367
span.Error("stream alloc failed", errors.Detail(err))
368
c.RespondError(httpError(err))
372
if err := fillCrc(location); err != nil {
373
span.Error("stream alloc fill location crc", err)
374
c.RespondError(httpError(err))
378
resp := access.AllocResp{
380
Tokens: genTokens(location),
383
span.Infof("done /alloc request resp:%+v", resp)
387
func (s *Service) Get(c *rpc.Context) {
388
args := new(access.GetArgs)
389
if err := c.ParseArgs(args); err != nil {
394
ctx := c.Request.Context()
395
span := trace.SpanFromContextSafe(ctx)
397
span.Debugf("accept /get request args:%+v", args)
398
if !args.IsValid() || !verifyCrc(&args.Location) {
399
c.RespondError(errcode.ErrIllegalArguments)
404
writer := s.limiter.Writer(ctx, w)
405
transfer, err := s.streamHandler.Get(ctx, writer, args.Location, args.ReadSize, args.Offset)
407
span.Error("stream get prepare failed", errors.Detail(err))
408
c.RespondError(httpError(err))
412
w.Header().Set(rpc.HeaderContentType, rpc.MIMEStream)
413
w.Header().Set(rpc.HeaderContentLength, strconv.FormatInt(int64(args.ReadSize), 10))
414
if args.ReadSize > 0 && args.ReadSize != args.Location.Size {
415
w.Header().Set(rpc.HeaderContentRange, fmt.Sprintf("bytes %d-%d/%d",
416
args.Offset, args.Offset+args.ReadSize-1, args.Location.Size))
417
c.RespondStatus(http.StatusPartialContent)
419
c.RespondStatus(http.StatusOK)
427
reportDownload(args.Location.ClusterID, "StatusOKError", "-")
428
span.Error("stream get transfer failed", errors.Detail(err))
431
span.Info("done /get request")
435
func (s *Service) Delete(c *rpc.Context) {
436
args := new(access.DeleteArgs)
437
if err := c.ParseArgs(args); err != nil {
442
ctx := c.Request.Context()
443
span := trace.SpanFromContextSafe(ctx)
446
var resp access.DeleteResp
449
c.RespondError(httpError(err))
453
if len(resp.FailedLocations) > 0 {
454
span.Errorf("failed locations N %d of %d", len(resp.FailedLocations), len(args.Locations))
458
c.RespondStatusData(http.StatusIMUsed, resp)
466
err = errcode.ErrIllegalArguments
469
span.Debugf("accept /delete request args: locations %d", len(args.Locations))
470
defer span.Info("done /delete request")
472
clusterBlobsN := make(map[proto.ClusterID]int, 4)
473
for _, loc := range args.Locations {
474
if !verifyCrc(&loc) {
475
span.Infof("invalid crc %+v", loc)
476
err = errcode.ErrIllegalArguments
479
clusterBlobsN[loc.ClusterID] += len(loc.Blobs)
482
if len(args.Locations) == 1 {
483
loc := args.Locations[0]
484
if err := s.streamHandler.Delete(ctx, &loc); err != nil {
485
span.Error("stream delete failed", errors.Detail(err))
486
resp.FailedLocations = []access.Location{loc}
498
merged := make(map[proto.ClusterID][]access.SliceInfo, len(clusterBlobsN))
499
for id, n := range clusterBlobsN {
500
merged[id] = make([]access.SliceInfo, 0, n)
502
for _, loc := range args.Locations {
503
merged[loc.ClusterID] = append(merged[loc.ClusterID], loc.Blobs...)
506
var wg sync.WaitGroup
507
failedCh := make(chan proto.ClusterID, 1)
508
done := make(chan struct{})
510
for id := range failedCh {
511
if resp.FailedLocations == nil {
512
resp.FailedLocations = make([]access.Location, 0, len(args.Locations))
514
for _, loc := range args.Locations {
515
if loc.ClusterID == id {
516
resp.FailedLocations = append(resp.FailedLocations, loc)
524
for id := range merged {
525
go func(id proto.ClusterID) {
526
if err := s.streamHandler.Delete(ctx, &access.Location{
531
span.Error("stream delete failed", id, errors.Detail(err))
544
func (s *Service) DeleteBlob(c *rpc.Context) {
545
args := new(access.DeleteBlobArgs)
546
if err := c.ParseArgs(args); err != nil {
551
ctx := c.Request.Context()
552
span := trace.SpanFromContextSafe(ctx)
554
span.Debugf("accept /deleteblob request args:%+v", args)
556
c.RespondError(errcode.ErrIllegalArguments)
561
for _, secretKey := range tokenSecretKeys {
562
token := uptoken.DecodeToken(args.Token)
563
if token.IsValid(args.ClusterID, args.Vid, args.BlobID, uint32(args.Size), secretKey[:]) {
569
span.Debugf("invalid token:%s", args.Token)
570
c.RespondError(errcode.ErrIllegalArguments)
574
if err := s.streamHandler.Delete(ctx, &access.Location{
575
ClusterID: args.ClusterID,
577
Blobs: []access.SliceInfo{{
583
span.Error("stream delete blob failed", errors.Detail(err))
584
c.RespondError(httpError(err))
589
span.Info("done /deleteblob request")
593
func (s *Service) Sign(c *rpc.Context) {
594
args := new(access.SignArgs)
595
if err := c.ParseArgs(args); err != nil {
600
ctx := c.Request.Context()
601
span := trace.SpanFromContextSafe(ctx)
604
c.RespondError(errcode.ErrIllegalArguments)
607
span.Debugf("accept /sign request args: %+v", args)
611
if err := signCrc(&loc, args.Locations); err != nil {
612
span.Error("stream sign failed", errors.Detail(err))
613
c.RespondError(errcode.ErrIllegalArguments)
617
c.RespondJSON(access.SignResp{Location: loc})
618
span.Infof("done /sign request crc %d -> %d, resp:%+v", crcOld, loc.Crc, loc)
621
func httpError(err error) error {
622
if e, ok := err.(rpc.HTTPError); ok {
625
if e, ok := err.(*errors.Error); ok {
626
return rpc.NewError(http.StatusInternalServerError, "ServerError", e.Cause())
628
return errcode.ErrUnexpected
639
func genTokens(location *access.Location) []string {
640
tokens := make([]string, 0, len(location.Blobs)+1)
642
hasMultiBlobs := location.Size >= uint64(location.BlobSize)
643
lastSize := uint32(location.Size % uint64(location.BlobSize))
644
for idx, blob := range location.Blobs {
648
if idx == len(location.Blobs)-1 && lastSize > 0 {
651
tokens = append(tokens, uptoken.EncodeToken(uptoken.NewUploadToken(location.ClusterID,
652
blob.Vid, blob.MinBid, count,
653
location.BlobSize, _tokenExpiration, tokenSecretKeys[0][:])))
657
if idx == len(location.Blobs)-1 && lastSize > 0 {
658
tokens = append(tokens, uptoken.EncodeToken(uptoken.NewUploadToken(location.ClusterID,
659
blob.Vid, blob.MinBid+proto.BlobID(blob.Count)-1, 1,
660
lastSize, _tokenExpiration, tokenSecretKeys[0][:])))