cubefs

Форк
0
/
stream_alloc.go 
206 строк · 6.2 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package access
16

17
import (
18
	"context"
19
	"sync/atomic"
20

21
	"github.com/afex/hystrix-go/hystrix"
22
	"github.com/cubefs/cubefs/blobstore/api/access"
23
	"github.com/cubefs/cubefs/blobstore/api/proxy"
24
	"github.com/cubefs/cubefs/blobstore/common/codemode"
25
	errcode "github.com/cubefs/cubefs/blobstore/common/errors"
26
	"github.com/cubefs/cubefs/blobstore/common/proto"
27
	"github.com/cubefs/cubefs/blobstore/common/trace"
28
	"github.com/cubefs/cubefs/blobstore/util/errors"
29
	"github.com/cubefs/cubefs/blobstore/util/retry"
30
)
31

32
var errAllocatePunishedVolume = errors.New("allocate punished volume")
33

34
// Alloc access interface /alloc
35
//     required: size, file size
36
//     optional: blobSize > 0, alloc with blobSize
37
//               assignClusterID > 0, assign to alloc in this cluster certainly
38
//               codeMode > 0, alloc in this codemode
39
//     return: a location of file
40
func (h *Handler) Alloc(ctx context.Context, size uint64, blobSize uint32,
41
	assignClusterID proto.ClusterID, codeMode codemode.CodeMode) (*access.Location, error) {
42
	span := trace.SpanFromContextSafe(ctx)
43
	span.Debugf("alloc request with size:%d blobsize:%d cluster:%d codemode:%d",
44
		size, blobSize, assignClusterID, codeMode)
45

46
	if int64(size) > h.maxObjectSize {
47
		span.Info("exceed max object size", h.maxObjectSize)
48
		return nil, errcode.ErrAccessExceedSize
49
	}
50

51
	if blobSize == 0 {
52
		blobSize = atomic.LoadUint32(&h.MaxBlobSize)
53
		span.Debugf("fill blobsize:%d", blobSize)
54
	}
55

56
	if codeMode == 0 {
57
		codeMode = h.allCodeModes.SelectCodeMode(int64(size))
58
		span.Debugf("select codemode:%d", codeMode)
59
	}
60
	if !codeMode.IsValid() {
61
		span.Infof("invalid codemode:%d", codeMode)
62
		return nil, errcode.ErrIllegalArguments
63
	}
64

65
	clusterID, blobs, err := h.allocFromAllocatorWithHystrix(ctx, codeMode, size, blobSize, assignClusterID)
66
	if err != nil {
67
		span.Error("alloc from proxy", errors.Detail(err))
68
		return nil, err
69
	}
70
	span.Debugf("allocated from %d %+v", clusterID, blobs)
71

72
	location := &access.Location{
73
		ClusterID: clusterID,
74
		CodeMode:  codeMode,
75
		Size:      size,
76
		BlobSize:  blobSize,
77
		Blobs:     blobs,
78
	}
79
	span.Debugf("alloc ok %+v", location)
80
	return location, nil
81
}
82

83
func (h *Handler) allocFromAllocatorWithHystrix(ctx context.Context, codeMode codemode.CodeMode, size uint64, blobSize uint32,
84
	clusterID proto.ClusterID) (cid proto.ClusterID, bidRets []access.SliceInfo, err error) {
85
	err = hystrix.Do(allocCommand, func() error {
86
		cid, bidRets, err = h.allocFromAllocator(ctx, codeMode, size, blobSize, clusterID)
87
		return err
88
	}, nil)
89
	return
90
}
91

92
func (h *Handler) allocFromAllocator(ctx context.Context, codeMode codemode.CodeMode, size uint64, blobSize uint32,
93
	clusterID proto.ClusterID) (proto.ClusterID, []access.SliceInfo, error) {
94
	span := trace.SpanFromContextSafe(ctx)
95

96
	if blobSize == 0 {
97
		blobSize = atomic.LoadUint32(&h.MaxBlobSize)
98
	}
99
	if clusterID == 0 {
100
		clusterChosen, err := h.clusterController.ChooseOne()
101
		if err != nil {
102
			return 0, nil, err
103
		}
104
		clusterID = clusterChosen.ClusterID
105
	}
106

107
	args := proxy.AllocVolsArgs{
108
		Fsize:    size,
109
		CodeMode: codeMode,
110
		BidCount: blobCount(size, blobSize),
111
	}
112

113
	var allocRets []proxy.AllocRet
114
	var allocHost string
115
	hostsSet := make(map[string]struct{}, 1)
116
	if err := retry.ExponentialBackoff(h.AllocRetryTimes, uint32(h.AllocRetryIntervalMS)).On(func() error {
117
		serviceController, err := h.clusterController.GetServiceController(clusterID)
118
		if err != nil {
119
			span.Warn(err)
120
			return errors.Info(err, "get service controller", clusterID)
121
		}
122

123
		var host string
124
		for range [10]struct{}{} {
125
			host, err = serviceController.GetServiceHost(ctx, serviceProxy)
126
			if err != nil {
127
				span.Warn(err)
128
				return errors.Info(err, "get proxy host", clusterID)
129
			}
130
			if _, ok := hostsSet[host]; ok {
131
				continue
132
			}
133
			hostsSet[host] = struct{}{}
134
			break
135
		}
136
		allocHost = host
137

138
		allocRets, err = h.proxyClient.VolumeAlloc(ctx, host, &args)
139
		if err != nil {
140
			if errorTimeout(err) || errorConnectionRefused(err) {
141
				span.Warn("punish unreachable proxy host:", host)
142
				reportUnhealth(clusterID, "punish", serviceProxy, host, "Timeout")
143
				serviceController.PunishServiceWithThreshold(ctx, serviceProxy, host, h.ServicePunishIntervalS)
144
			}
145
			span.Warn(host, err)
146
			return errors.Base(err, "alloc from proxy", host)
147
		}
148

149
		// filter punished volume in allocating progress
150
		for _, ret := range allocRets {
151
			vInfo, err := h.getVolume(ctx, clusterID, ret.Vid, true)
152
			if err != nil {
153
				span.Warn(err)
154
				return err
155
			}
156
			if vInfo.IsPunish {
157
				// return err and retry allocate
158
				err = errAllocatePunishedVolume
159
				args.Excludes = append(args.Excludes, vInfo.Vid)
160
				span.Warn("next retry exclude vid:", vInfo.Vid, err)
161
				return err
162
			}
163
		}
164

165
		return nil
166
	}); err != nil {
167
		if err != errAllocatePunishedVolume {
168
			reportUnhealth(clusterID, "allocate", "-", "-", "failed")
169
			return 0, nil, err
170
		}
171
		// still write to storage if allocating punished volume
172
		reportUnhealth(clusterID, "allocate", "-", "-", "punished")
173
	}
174

175
	// cache vid in which allocator
176
	for _, ret := range allocRets {
177
		setCacheVidHost(clusterID, ret.Vid, allocHost)
178
	}
179

180
	blobN := blobCount(size, blobSize)
181
	blobs := make([]access.SliceInfo, 0, blobN)
182
	for _, bidRet := range allocRets {
183
		if blobN <= 0 {
184
			break
185
		}
186

187
		count := minU64(blobN, uint64(bidRet.BidEnd)-uint64(bidRet.BidStart)+1)
188
		blobN -= count
189

190
		blobs = append(blobs, access.SliceInfo{
191
			MinBid: bidRet.BidStart,
192
			Vid:    bidRet.Vid,
193
			Count:  uint32(count),
194
		})
195
	}
196
	if blobN > 0 {
197
		return 0, nil, errors.New("no enough blob ids from allocator")
198
	}
199

200
	if len(blobs) > access.MaxLocationBlobs {
201
		span.Errorf("alloc exceed max blobs %d>%d", len(blobs), access.MaxLocationBlobs)
202
		return 0, nil, errors.New("alloc exceed max blobs of location")
203
	}
204

205
	return clusterID, blobs, nil
206
}
207

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.