1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
21
"github.com/afex/hystrix-go/hystrix"
22
"github.com/cubefs/cubefs/blobstore/api/access"
23
"github.com/cubefs/cubefs/blobstore/api/proxy"
24
"github.com/cubefs/cubefs/blobstore/common/codemode"
25
errcode "github.com/cubefs/cubefs/blobstore/common/errors"
26
"github.com/cubefs/cubefs/blobstore/common/proto"
27
"github.com/cubefs/cubefs/blobstore/common/trace"
28
"github.com/cubefs/cubefs/blobstore/util/errors"
29
"github.com/cubefs/cubefs/blobstore/util/retry"
32
var errAllocatePunishedVolume = errors.New("allocate punished volume")
34
// Alloc access interface /alloc
35
// required: size, file size
36
// optional: blobSize > 0, alloc with blobSize
37
// assignClusterID > 0, assign to alloc in this cluster certainly
38
// codeMode > 0, alloc in this codemode
39
// return: a location of file
40
func (h *Handler) Alloc(ctx context.Context, size uint64, blobSize uint32,
41
assignClusterID proto.ClusterID, codeMode codemode.CodeMode) (*access.Location, error) {
42
span := trace.SpanFromContextSafe(ctx)
43
span.Debugf("alloc request with size:%d blobsize:%d cluster:%d codemode:%d",
44
size, blobSize, assignClusterID, codeMode)
46
if int64(size) > h.maxObjectSize {
47
span.Info("exceed max object size", h.maxObjectSize)
48
return nil, errcode.ErrAccessExceedSize
52
blobSize = atomic.LoadUint32(&h.MaxBlobSize)
53
span.Debugf("fill blobsize:%d", blobSize)
57
codeMode = h.allCodeModes.SelectCodeMode(int64(size))
58
span.Debugf("select codemode:%d", codeMode)
60
if !codeMode.IsValid() {
61
span.Infof("invalid codemode:%d", codeMode)
62
return nil, errcode.ErrIllegalArguments
65
clusterID, blobs, err := h.allocFromAllocatorWithHystrix(ctx, codeMode, size, blobSize, assignClusterID)
67
span.Error("alloc from proxy", errors.Detail(err))
70
span.Debugf("allocated from %d %+v", clusterID, blobs)
72
location := &access.Location{
79
span.Debugf("alloc ok %+v", location)
83
func (h *Handler) allocFromAllocatorWithHystrix(ctx context.Context, codeMode codemode.CodeMode, size uint64, blobSize uint32,
84
clusterID proto.ClusterID) (cid proto.ClusterID, bidRets []access.SliceInfo, err error) {
85
err = hystrix.Do(allocCommand, func() error {
86
cid, bidRets, err = h.allocFromAllocator(ctx, codeMode, size, blobSize, clusterID)
92
func (h *Handler) allocFromAllocator(ctx context.Context, codeMode codemode.CodeMode, size uint64, blobSize uint32,
93
clusterID proto.ClusterID) (proto.ClusterID, []access.SliceInfo, error) {
94
span := trace.SpanFromContextSafe(ctx)
97
blobSize = atomic.LoadUint32(&h.MaxBlobSize)
100
clusterChosen, err := h.clusterController.ChooseOne()
104
clusterID = clusterChosen.ClusterID
107
args := proxy.AllocVolsArgs{
110
BidCount: blobCount(size, blobSize),
113
var allocRets []proxy.AllocRet
115
hostsSet := make(map[string]struct{}, 1)
116
if err := retry.ExponentialBackoff(h.AllocRetryTimes, uint32(h.AllocRetryIntervalMS)).On(func() error {
117
serviceController, err := h.clusterController.GetServiceController(clusterID)
120
return errors.Info(err, "get service controller", clusterID)
124
for range [10]struct{}{} {
125
host, err = serviceController.GetServiceHost(ctx, serviceProxy)
128
return errors.Info(err, "get proxy host", clusterID)
130
if _, ok := hostsSet[host]; ok {
133
hostsSet[host] = struct{}{}
138
allocRets, err = h.proxyClient.VolumeAlloc(ctx, host, &args)
140
if errorTimeout(err) || errorConnectionRefused(err) {
141
span.Warn("punish unreachable proxy host:", host)
142
reportUnhealth(clusterID, "punish", serviceProxy, host, "Timeout")
143
serviceController.PunishServiceWithThreshold(ctx, serviceProxy, host, h.ServicePunishIntervalS)
146
return errors.Base(err, "alloc from proxy", host)
149
// filter punished volume in allocating progress
150
for _, ret := range allocRets {
151
vInfo, err := h.getVolume(ctx, clusterID, ret.Vid, true)
157
// return err and retry allocate
158
err = errAllocatePunishedVolume
159
args.Excludes = append(args.Excludes, vInfo.Vid)
160
span.Warn("next retry exclude vid:", vInfo.Vid, err)
167
if err != errAllocatePunishedVolume {
168
reportUnhealth(clusterID, "allocate", "-", "-", "failed")
171
// still write to storage if allocating punished volume
172
reportUnhealth(clusterID, "allocate", "-", "-", "punished")
175
// cache vid in which allocator
176
for _, ret := range allocRets {
177
setCacheVidHost(clusterID, ret.Vid, allocHost)
180
blobN := blobCount(size, blobSize)
181
blobs := make([]access.SliceInfo, 0, blobN)
182
for _, bidRet := range allocRets {
187
count := minU64(blobN, uint64(bidRet.BidEnd)-uint64(bidRet.BidStart)+1)
190
blobs = append(blobs, access.SliceInfo{
191
MinBid: bidRet.BidStart,
193
Count: uint32(count),
197
return 0, nil, errors.New("no enough blob ids from allocator")
200
if len(blobs) > access.MaxLocationBlobs {
201
span.Errorf("alloc exceed max blobs %d>%d", len(blobs), access.MaxLocationBlobs)
202
return 0, nil, errors.New("alloc exceed max blobs of location")
205
return clusterID, blobs, nil