cubefs

Форк
0
/
server.go 
665 строк · 17.2 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package access
16

17
import (
18
	"crypto/sha1"
19
	"fmt"
20
	"net/http"
21
	"strconv"
22
	"sync"
23
	"time"
24

25
	"github.com/cubefs/cubefs/blobstore/access/controller"
26
	"github.com/cubefs/cubefs/blobstore/api/access"
27
	"github.com/cubefs/cubefs/blobstore/api/clustermgr"
28
	"github.com/cubefs/cubefs/blobstore/cmd"
29
	"github.com/cubefs/cubefs/blobstore/common/consul"
30
	errcode "github.com/cubefs/cubefs/blobstore/common/errors"
31
	"github.com/cubefs/cubefs/blobstore/common/profile"
32
	"github.com/cubefs/cubefs/blobstore/common/proto"
33
	"github.com/cubefs/cubefs/blobstore/common/resourcepool"
34
	"github.com/cubefs/cubefs/blobstore/common/rpc"
35
	"github.com/cubefs/cubefs/blobstore/common/trace"
36
	"github.com/cubefs/cubefs/blobstore/common/uptoken"
37
	"github.com/cubefs/cubefs/blobstore/util/closer"
38
	"github.com/cubefs/cubefs/blobstore/util/errors"
39
	"github.com/cubefs/cubefs/blobstore/util/log"
40
)
41

42
const (
43
	limitNameAlloc  = "alloc"
44
	limitNamePut    = "put"
45
	limitNamePutAt  = "putat"
46
	limitNameGet    = "get"
47
	limitNameDelete = "delete"
48
	limitNameSign   = "sign"
49
)
50

51
const (
52
	_tokenExpiration = time.Hour * 12
53
)
54

55
var (
56
	// tokenSecretKeys alloc token with the first secret key always,
57
	// so that you can change the secret key.
58
	//
59
	// parse-1: insert a new key at the first index,
60
	// parse-2: delete the old key at the last index after _tokenExpiration duration.
61
	tokenSecretKeys = [...][20]byte{
62
		{0x5f, 0x00, 0x88, 0x96, 0x00, 0xa1, 0xfe, 0x1b},
63
		{0xff, 0x1f, 0x2f, 0x4f, 0x7f, 0xaf, 0xef, 0xff},
64
	}
65
	_initTokenSecret sync.Once
66
)
67

68
func initTokenSecret(b []byte) {
69
	_initTokenSecret.Do(func() {
70
		for idx := range tokenSecretKeys {
71
			copy(tokenSecretKeys[idx][7:], b)
72
		}
73
	})
74
}
75

76
func initWithRegionMagic(regionMagic string) {
77
	if regionMagic == "" {
78
		log.Warn("no region magic setting, using default secret keys for checksum")
79
		return
80
	}
81
	b := sha1.Sum([]byte(regionMagic))
82
	initTokenSecret(b[:8])
83
	initLocationSecret(b[:8])
84
}
85

86
type accessStatus struct {
87
	Limit Status              `json:"limit"`
88
	Pool  resourcepool.Status `json:"pool"`
89

90
	Config   StreamConfig                            `json:"config"`
91
	Clusters []*clustermgr.ClusterInfo               `json:"clusters"`
92
	Services map[proto.ClusterID]map[string][]string `json:"services"`
93
}
94

95
// Config service configs
96
type Config struct {
97
	cmd.Config
98

99
	ServiceRegister consul.Config `json:"service_register"`
100
	Stream          StreamConfig  `json:"stream"`
101
	Limit           LimitConfig   `json:"limit"`
102
}
103

104
// Service rpc service
105
type Service struct {
106
	config        Config
107
	streamHandler StreamHandler
108
	limiter       Limiter
109
	closer        closer.Closer
110
}
111

112
// New returns an access service
113
func New(cfg Config) *Service {
114
	// add region magic checksum to the secret keys
115
	initWithRegionMagic(cfg.Stream.ClusterConfig.RegionMagic)
116

117
	cl := closer.New()
118
	return &Service{
119
		config:        cfg,
120
		streamHandler: NewStreamHandler(&cfg.Stream, cl.Done()),
121
		limiter:       NewLimiter(cfg.Limit),
122
		closer:        cl,
123
	}
124
}
125

126
// Close close server
127
func (s *Service) Close() {
128
	s.closer.Close()
129
}
130

131
// RegisterService register service to rpc
132
func (s *Service) RegisterService() {
133
	if s.config.ServiceRegister.ConsulAddr == "" {
134
		return
135
	}
136
	_, err := consul.ServiceRegister(s.config.BindAddr, &s.config.ServiceRegister)
137
	if err != nil {
138
		log.Fatalf("service register failed, err: %v", err)
139
	}
140
}
141

142
// RegisterAdminHandler register admin handler to profile
143
func (s *Service) RegisterAdminHandler() {
144
	profile.HandleFunc(http.MethodGet, "/access/status", func(c *rpc.Context) {
145
		var admin *streamAdmin
146
		if sa := s.streamHandler.Admin(); sa != nil {
147
			if ad, ok := sa.(*streamAdmin); ok {
148
				admin = ad
149
			}
150
		}
151
		if admin == nil {
152
			c.RespondStatus(http.StatusServiceUnavailable)
153
			return
154
		}
155

156
		ctx := c.Request.Context()
157
		span := trace.SpanFromContextSafe(ctx)
158

159
		status := new(accessStatus)
160
		status.Limit = s.limiter.Status()
161
		status.Pool = admin.memPool.Status()
162
		status.Config = admin.config
163
		status.Clusters = admin.controller.All()
164
		status.Services = make(map[proto.ClusterID]map[string][]string, len(status.Clusters))
165

166
		for _, cluster := range status.Clusters {
167
			service, err := admin.controller.GetServiceController(cluster.ClusterID)
168
			if err != nil {
169
				span.Warn(err.Error())
170
				continue
171
			}
172

173
			svrs := make(map[string][]string, 1)
174
			svrName := proto.ServiceNameProxy
175
			if hosts, err := service.GetServiceHosts(ctx, svrName); err == nil {
176
				svrs[svrName] = hosts
177
			} else {
178
				span.Warn(err.Error())
179
			}
180
			status.Services[cluster.ClusterID] = svrs
181
		}
182
		c.RespondJSON(status)
183
	})
184

185
	profile.HandleFunc(http.MethodPost, "/access/stream/controller/alg/:alg", func(c *rpc.Context) {
186
		algInt, err := strconv.ParseUint(c.Param.ByName("alg"), 10, 32)
187
		if err != nil {
188
			c.RespondWith(http.StatusBadRequest, "", []byte(err.Error()))
189
			return
190
		}
191

192
		alg := controller.AlgChoose(algInt)
193
		if sa := s.streamHandler.Admin(); sa != nil {
194
			if admin, ok := sa.(*streamAdmin); ok {
195
				if err := admin.controller.ChangeChooseAlg(alg); err != nil {
196
					c.RespondWith(http.StatusForbidden, "", []byte(err.Error()))
197
					return
198
				}
199

200
				span := trace.SpanFromContextSafe(c.Request.Context())
201
				span.Warnf("change cluster choose algorithm to (%d %s)", alg, alg.String())
202
				c.Respond()
203
				return
204
			}
205
		}
206

207
		c.RespondStatus(http.StatusServiceUnavailable)
208
	}, rpc.OptArgsURI())
209
}
210

211
// Limit rps controller
212
func (s *Service) Limit(c *rpc.Context) {
213
	name := ""
214
	switch c.Request.URL.Path {
215
	case "/alloc":
216
		name = limitNameAlloc
217
	case "/put":
218
		name = limitNamePut
219
	case "/putat":
220
		name = limitNamePutAt
221
	case "/get":
222
		name = limitNameGet
223
	case "/delete":
224
		name = limitNameDelete
225
	case "/sign":
226
		name = limitNameSign
227
	default:
228
	}
229
	if name == "" {
230
		return
231
	}
232

233
	if err := s.limiter.Acquire(name); err != nil {
234
		span := trace.SpanFromContextSafe(c.Request.Context())
235
		span.Info("access concurrent limited", name, err)
236
		c.AbortWithError(errcode.ErrAccessLimited)
237
		return
238
	}
239
	defer s.limiter.Release(name)
240
	c.Next()
241
}
242

243
// Put one object
244
func (s *Service) Put(c *rpc.Context) {
245
	args := new(access.PutArgs)
246
	if err := c.ParseArgs(args); err != nil {
247
		c.RespondError(err)
248
		return
249
	}
250

251
	ctx := c.Request.Context()
252
	span := trace.SpanFromContextSafe(ctx)
253

254
	span.Debugf("accept /put request args:%+v", args)
255
	if !args.IsValid() {
256
		c.RespondError(errcode.ErrIllegalArguments)
257
		return
258
	}
259

260
	hashSumMap := args.Hashes.ToHashSumMap()
261
	hasherMap := make(access.HasherMap, len(hashSumMap))
262
	// make hashser
263
	for alg := range hashSumMap {
264
		hasherMap[alg] = alg.ToHasher()
265
	}
266

267
	rc := s.limiter.Reader(ctx, c.Request.Body)
268
	loc, err := s.streamHandler.Put(ctx, rc, args.Size, hasherMap)
269
	if err != nil {
270
		span.Error("stream put failed", errors.Detail(err))
271
		c.RespondError(httpError(err))
272
		return
273
	}
274

275
	// hasher sum
276
	for alg, hasher := range hasherMap {
277
		hashSumMap[alg] = hasher.Sum(nil)
278
	}
279

280
	if err := fillCrc(loc); err != nil {
281
		span.Error("stream put fill location crc", err)
282
		c.RespondError(httpError(err))
283
		return
284
	}
285

286
	c.RespondJSON(access.PutResp{
287
		Location:   *loc,
288
		HashSumMap: hashSumMap,
289
	})
290
	span.Infof("done /put request location:%+v hash:%+v", loc, hashSumMap.All())
291
}
292

293
// PutAt put one blob
294
func (s *Service) PutAt(c *rpc.Context) {
295
	args := new(access.PutAtArgs)
296
	if err := c.ParseArgs(args); err != nil {
297
		c.RespondError(err)
298
		return
299
	}
300

301
	ctx := c.Request.Context()
302
	span := trace.SpanFromContextSafe(ctx)
303

304
	span.Debugf("accept /putat request args:%+v", args)
305
	if !args.IsValid() {
306
		c.RespondError(errcode.ErrIllegalArguments)
307
		return
308
	}
309

310
	valid := false
311
	for _, secretKey := range tokenSecretKeys {
312
		token := uptoken.DecodeToken(args.Token)
313
		if token.IsValid(args.ClusterID, args.Vid, args.BlobID, uint32(args.Size), secretKey[:]) {
314
			valid = true
315
			break
316
		}
317
	}
318
	if !valid {
319
		span.Debugf("invalid token:%s", args.Token)
320
		c.RespondError(errcode.ErrIllegalArguments)
321
		return
322
	}
323

324
	hashSumMap := args.Hashes.ToHashSumMap()
325
	hasherMap := make(access.HasherMap, len(hashSumMap))
326
	// make hashser
327
	for alg := range hashSumMap {
328
		hasherMap[alg] = alg.ToHasher()
329
	}
330

331
	rc := s.limiter.Reader(ctx, c.Request.Body)
332
	err := s.streamHandler.PutAt(ctx, rc, args.ClusterID, args.Vid, args.BlobID, args.Size, hasherMap)
333
	if err != nil {
334
		span.Error("stream putat failed", errors.Detail(err))
335
		c.RespondError(httpError(err))
336
		return
337
	}
338

339
	// hasher sum
340
	for alg, hasher := range hasherMap {
341
		hashSumMap[alg] = hasher.Sum(nil)
342
	}
343

344
	c.RespondJSON(access.PutAtResp{HashSumMap: hashSumMap})
345
	span.Infof("done /putat request hash:%+v", hashSumMap.All())
346
}
347

348
// Alloc alloc one location
349
func (s *Service) Alloc(c *rpc.Context) {
350
	args := new(access.AllocArgs)
351
	if err := c.ParseArgs(args); err != nil {
352
		c.RespondError(err)
353
		return
354
	}
355

356
	ctx := c.Request.Context()
357
	span := trace.SpanFromContextSafe(ctx)
358

359
	span.Debugf("accept /alloc request args:%+v", args)
360
	if !args.IsValid() {
361
		c.RespondError(errcode.ErrIllegalArguments)
362
		return
363
	}
364

365
	location, err := s.streamHandler.Alloc(ctx, args.Size, args.BlobSize, args.AssignClusterID, args.CodeMode)
366
	if err != nil {
367
		span.Error("stream alloc failed", errors.Detail(err))
368
		c.RespondError(httpError(err))
369
		return
370
	}
371

372
	if err := fillCrc(location); err != nil {
373
		span.Error("stream alloc fill location crc", err)
374
		c.RespondError(httpError(err))
375
		return
376
	}
377

378
	resp := access.AllocResp{
379
		Location: *location,
380
		Tokens:   genTokens(location),
381
	}
382
	c.RespondJSON(resp)
383
	span.Infof("done /alloc request resp:%+v", resp)
384
}
385

386
// Get read file
387
func (s *Service) Get(c *rpc.Context) {
388
	args := new(access.GetArgs)
389
	if err := c.ParseArgs(args); err != nil {
390
		c.RespondError(err)
391
		return
392
	}
393

394
	ctx := c.Request.Context()
395
	span := trace.SpanFromContextSafe(ctx)
396

397
	span.Debugf("accept /get request args:%+v", args)
398
	if !args.IsValid() || !verifyCrc(&args.Location) {
399
		c.RespondError(errcode.ErrIllegalArguments)
400
		return
401
	}
402

403
	w := c.Writer
404
	writer := s.limiter.Writer(ctx, w)
405
	transfer, err := s.streamHandler.Get(ctx, writer, args.Location, args.ReadSize, args.Offset)
406
	if err != nil {
407
		span.Error("stream get prepare failed", errors.Detail(err))
408
		c.RespondError(httpError(err))
409
		return
410
	}
411

412
	w.Header().Set(rpc.HeaderContentType, rpc.MIMEStream)
413
	w.Header().Set(rpc.HeaderContentLength, strconv.FormatInt(int64(args.ReadSize), 10))
414
	if args.ReadSize > 0 && args.ReadSize != args.Location.Size {
415
		w.Header().Set(rpc.HeaderContentRange, fmt.Sprintf("bytes %d-%d/%d",
416
			args.Offset, args.Offset+args.ReadSize-1, args.Location.Size))
417
		c.RespondStatus(http.StatusPartialContent)
418
	} else {
419
		c.RespondStatus(http.StatusOK)
420
	}
421

422
	// flush headers to client firstly
423
	c.Flush()
424

425
	err = transfer()
426
	if err != nil {
427
		reportDownload(args.Location.ClusterID, "StatusOKError", "-")
428
		span.Error("stream get transfer failed", errors.Detail(err))
429
		return
430
	}
431
	span.Info("done /get request")
432
}
433

434
// Delete  all blobs in this location
435
func (s *Service) Delete(c *rpc.Context) {
436
	args := new(access.DeleteArgs)
437
	if err := c.ParseArgs(args); err != nil {
438
		c.RespondError(err)
439
		return
440
	}
441

442
	ctx := c.Request.Context()
443
	span := trace.SpanFromContextSafe(ctx)
444

445
	var err error
446
	var resp access.DeleteResp
447
	defer func() {
448
		if err != nil {
449
			c.RespondError(httpError(err))
450
			return
451
		}
452

453
		if len(resp.FailedLocations) > 0 {
454
			span.Errorf("failed locations N %d of %d", len(resp.FailedLocations), len(args.Locations))
455
			// must return 2xx even if has failed locations,
456
			// cos rpc read body only on 2xx.
457
			// TODO: return other http status code
458
			c.RespondStatusData(http.StatusIMUsed, resp)
459
			return
460
		}
461

462
		c.RespondJSON(resp)
463
	}()
464

465
	if !args.IsValid() {
466
		err = errcode.ErrIllegalArguments
467
		return
468
	}
469
	span.Debugf("accept /delete request args: locations %d", len(args.Locations))
470
	defer span.Info("done /delete request")
471

472
	clusterBlobsN := make(map[proto.ClusterID]int, 4)
473
	for _, loc := range args.Locations {
474
		if !verifyCrc(&loc) {
475
			span.Infof("invalid crc %+v", loc)
476
			err = errcode.ErrIllegalArguments
477
			return
478
		}
479
		clusterBlobsN[loc.ClusterID] += len(loc.Blobs)
480
	}
481

482
	if len(args.Locations) == 1 {
483
		loc := args.Locations[0]
484
		if err := s.streamHandler.Delete(ctx, &loc); err != nil {
485
			span.Error("stream delete failed", errors.Detail(err))
486
			resp.FailedLocations = []access.Location{loc}
487
		}
488
		return
489
	}
490

491
	// merge the same cluster locations to one delete message,
492
	// anyone of this cluster failed, all locations mark failure,
493
	//
494
	// a min delete message about 10-20 bytes,
495
	// max delete locations is 1024, one location is max to 5G,
496
	// merged message max size about 40MB.
497

498
	merged := make(map[proto.ClusterID][]access.SliceInfo, len(clusterBlobsN))
499
	for id, n := range clusterBlobsN {
500
		merged[id] = make([]access.SliceInfo, 0, n)
501
	}
502
	for _, loc := range args.Locations {
503
		merged[loc.ClusterID] = append(merged[loc.ClusterID], loc.Blobs...)
504
	}
505

506
	var wg sync.WaitGroup
507
	failedCh := make(chan proto.ClusterID, 1)
508
	done := make(chan struct{})
509
	go func() {
510
		for id := range failedCh {
511
			if resp.FailedLocations == nil {
512
				resp.FailedLocations = make([]access.Location, 0, len(args.Locations))
513
			}
514
			for _, loc := range args.Locations {
515
				if loc.ClusterID == id {
516
					resp.FailedLocations = append(resp.FailedLocations, loc)
517
				}
518
			}
519
		}
520
		close(done)
521
	}()
522

523
	wg.Add(len(merged))
524
	for id := range merged {
525
		go func(id proto.ClusterID) {
526
			if err := s.streamHandler.Delete(ctx, &access.Location{
527
				ClusterID: id,
528
				BlobSize:  1,
529
				Blobs:     merged[id],
530
			}); err != nil {
531
				span.Error("stream delete failed", id, errors.Detail(err))
532
				failedCh <- id
533
			}
534
			wg.Done()
535
		}(id)
536
	}
537

538
	wg.Wait()
539
	close(failedCh)
540
	<-done
541
}
542

543
// DeleteBlob delete one blob
544
func (s *Service) DeleteBlob(c *rpc.Context) {
545
	args := new(access.DeleteBlobArgs)
546
	if err := c.ParseArgs(args); err != nil {
547
		c.RespondError(err)
548
		return
549
	}
550

551
	ctx := c.Request.Context()
552
	span := trace.SpanFromContextSafe(ctx)
553

554
	span.Debugf("accept /deleteblob request args:%+v", args)
555
	if !args.IsValid() {
556
		c.RespondError(errcode.ErrIllegalArguments)
557
		return
558
	}
559

560
	valid := false
561
	for _, secretKey := range tokenSecretKeys {
562
		token := uptoken.DecodeToken(args.Token)
563
		if token.IsValid(args.ClusterID, args.Vid, args.BlobID, uint32(args.Size), secretKey[:]) {
564
			valid = true
565
			break
566
		}
567
	}
568
	if !valid {
569
		span.Debugf("invalid token:%s", args.Token)
570
		c.RespondError(errcode.ErrIllegalArguments)
571
		return
572
	}
573

574
	if err := s.streamHandler.Delete(ctx, &access.Location{
575
		ClusterID: args.ClusterID,
576
		BlobSize:  1,
577
		Blobs: []access.SliceInfo{{
578
			MinBid: args.BlobID,
579
			Vid:    args.Vid,
580
			Count:  1,
581
		}},
582
	}); err != nil {
583
		span.Error("stream delete blob failed", errors.Detail(err))
584
		c.RespondError(httpError(err))
585
		return
586
	}
587

588
	c.Respond()
589
	span.Info("done /deleteblob request")
590
}
591

592
// Sign generate crc with locations
593
func (s *Service) Sign(c *rpc.Context) {
594
	args := new(access.SignArgs)
595
	if err := c.ParseArgs(args); err != nil {
596
		c.RespondError(err)
597
		return
598
	}
599

600
	ctx := c.Request.Context()
601
	span := trace.SpanFromContextSafe(ctx)
602

603
	if !args.IsValid() {
604
		c.RespondError(errcode.ErrIllegalArguments)
605
		return
606
	}
607
	span.Debugf("accept /sign request args: %+v", args)
608

609
	loc := args.Location
610
	crcOld := loc.Crc
611
	if err := signCrc(&loc, args.Locations); err != nil {
612
		span.Error("stream sign failed", errors.Detail(err))
613
		c.RespondError(errcode.ErrIllegalArguments)
614
		return
615
	}
616

617
	c.RespondJSON(access.SignResp{Location: loc})
618
	span.Infof("done /sign request crc %d -> %d, resp:%+v", crcOld, loc.Crc, loc)
619
}
620

621
func httpError(err error) error {
622
	if e, ok := err.(rpc.HTTPError); ok {
623
		return e
624
	}
625
	if e, ok := err.(*errors.Error); ok {
626
		return rpc.NewError(http.StatusInternalServerError, "ServerError", e.Cause())
627
	}
628
	return errcode.ErrUnexpected
629
}
630

631
// genTokens generate tokens
632
//  1. Returns 0 token if has no blobs.
633
//  2. Returns 1 token if file size less than blobsize.
634
//  3. Returns len(blobs) tokens if size divided by blobsize.
635
//  4. Otherwise returns len(blobs)+1 tokens, the last token
636
//     will be used by the last blob, even if the last slice blobs' size
637
//     less than blobsize.
638
//  5. Each segment blob has its specified token include the last blob.
639
func genTokens(location *access.Location) []string {
640
	tokens := make([]string, 0, len(location.Blobs)+1)
641

642
	hasMultiBlobs := location.Size >= uint64(location.BlobSize)
643
	lastSize := uint32(location.Size % uint64(location.BlobSize))
644
	for idx, blob := range location.Blobs {
645
		// returns one token if size < blobsize
646
		if hasMultiBlobs {
647
			count := blob.Count
648
			if idx == len(location.Blobs)-1 && lastSize > 0 {
649
				count--
650
			}
651
			tokens = append(tokens, uptoken.EncodeToken(uptoken.NewUploadToken(location.ClusterID,
652
				blob.Vid, blob.MinBid, count,
653
				location.BlobSize, _tokenExpiration, tokenSecretKeys[0][:])))
654
		}
655

656
		// token of the last blob
657
		if idx == len(location.Blobs)-1 && lastSize > 0 {
658
			tokens = append(tokens, uptoken.EncodeToken(uptoken.NewUploadToken(location.ClusterID,
659
				blob.Vid, blob.MinBid+proto.BlobID(blob.Count)-1, 1,
660
				lastSize, _tokenExpiration, tokenSecretKeys[0][:])))
661
		}
662
	}
663

664
	return tokens
665
}
666

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.