cubefs

Форк
0
/
service_test.go 
444 строки · 11.7 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package proxy
16

17
import (
18
	"context"
19
	"net/http/httptest"
20
	"sync"
21
	"testing"
22

23
	"github.com/Shopify/sarama"
24
	"github.com/golang/mock/gomock"
25
	"github.com/stretchr/testify/require"
26

27
	"github.com/cubefs/cubefs/blobstore/api/blobnode"
28
	"github.com/cubefs/cubefs/blobstore/api/clustermgr"
29
	"github.com/cubefs/cubefs/blobstore/api/proxy"
30
	"github.com/cubefs/cubefs/blobstore/common/codemode"
31
	errcode "github.com/cubefs/cubefs/blobstore/common/errors"
32
	"github.com/cubefs/cubefs/blobstore/common/kafka"
33
	"github.com/cubefs/cubefs/blobstore/common/proto"
34
	"github.com/cubefs/cubefs/blobstore/common/rpc"
35
	"github.com/cubefs/cubefs/blobstore/proxy/allocator"
36
	"github.com/cubefs/cubefs/blobstore/proxy/mock"
37
	_ "github.com/cubefs/cubefs/blobstore/testing/nolog"
38
	"github.com/cubefs/cubefs/blobstore/util/errors"
39
)
40

41
var (
42
	A = gomock.Any()
43

44
	errCodeMode = errors.New("codeMode not exist")
45
	errBidCount = errors.New("count too large")
46
	ctx         = context.Background()
47

48
	proxyServer *httptest.Server
49
	once        sync.Once
50
)
51

52
func runMockService(s *Service) string {
53
	once.Do(func() {
54
		proxyServer = httptest.NewServer(NewHandler(s))
55
	})
56
	return proxyServer.URL
57
}
58

59
func newMockService(t *testing.T) *Service {
60
	ctr := gomock.NewController(t)
61

62
	blobDeleteMgr := mock.NewMockBlobDeleteHandler(ctr)
63
	blobDeleteMgr.EXPECT().SendDeleteMsg(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
64
		func(ctx context.Context, info *proxy.DeleteArgs) error {
65
			if len(info.Blobs) > 1 {
66
				return errors.New("fake send delete message failed")
67
			}
68
			return nil
69
		},
70
	)
71

72
	shardRepairMgr := mock.NewMockShardRepairHandler(ctr)
73
	shardRepairMgr.EXPECT().SendShardRepairMsg(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
74
		func(ctx context.Context, info *proxy.ShardRepairArgs) error {
75
			if info.Vid == 100 {
76
				return errors.New("fake send shard repair message failed")
77
			}
78
			return nil
79
		})
80

81
	volumeMgr := mock.NewMockVolumeMgr(ctr)
82
	volumeMgr.EXPECT().Alloc(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
83
		func(ctx context.Context, args *proxy.AllocVolsArgs) (allocVols []proxy.AllocRet, err error) {
84
			if args.CodeMode != codemode.EC6P6 && args.CodeMode != codemode.EC15P12 {
85
				return nil, errCodeMode
86
			}
87
			if args.BidCount > 10000 || args.BidCount < 1 {
88
				return nil, errBidCount
89
			}
90
			return nil, nil
91
		})
92
	volumeMgr.EXPECT().List(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
93
		func(ctx context.Context, codeMode codemode.CodeMode) (vids []proto.Vid, volumes []clustermgr.AllocVolumeInfo, err error) {
94
			if codeMode != codemode.EC6P6 && codeMode != codemode.EC15P12 {
95
				return nil, nil, errCodeMode
96
			}
97
			return nil, nil, nil
98
		})
99
	volumeMgr.EXPECT().Discard(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
100
		func(ctx context.Context, args *proxy.DiscardVolsArgs) (err error) {
101
			if args.CodeMode != codemode.EC6P6 && args.CodeMode != codemode.EC15P12 {
102
				return errCodeMode
103
			}
104
			return nil
105
		})
106

107
	cacher := mock.NewMockCacher(ctr)
108
	cacher.EXPECT().GetVolume(A, A).AnyTimes().DoAndReturn(
109
		func(_ context.Context, args *proxy.CacheVolumeArgs) (*proxy.VersionVolume, error) {
110
			volume := new(proxy.VersionVolume)
111
			if args.Vid%2 == 0 {
112
				volume.Vid = args.Vid
113
				return volume, nil
114
			}
115
			if args.Flush {
116
				return nil, errcode.ErrVolumeNotExist
117
			}
118
			return nil, errors.New("internal error")
119
		})
120
	cacher.EXPECT().GetDisk(A, A).AnyTimes().DoAndReturn(
121
		func(_ context.Context, args *proxy.CacheDiskArgs) (*blobnode.DiskInfo, error) {
122
			disk := new(blobnode.DiskInfo)
123
			if args.DiskID%2 == 0 {
124
				disk.DiskID = args.DiskID
125
				return disk, nil
126
			}
127
			if args.Flush {
128
				return nil, errcode.ErrCMDiskNotFound
129
			}
130
			return nil, errors.New("internal error")
131
		})
132
	cacher.EXPECT().Erase(A, A).AnyTimes().DoAndReturn(
133
		func(_ context.Context, key string) error {
134
			if key == "ALL" {
135
				return errors.New("internal error")
136
			}
137
			return nil
138
		})
139

140
	return &Service{
141
		Config: Config{
142
			VolConfig: allocator.VolConfig{
143
				ClusterID: 1,
144
			},
145
		},
146
		shardRepairMgr: shardRepairMgr,
147
		blobDeleteMgr:  blobDeleteMgr,
148
		volumeMgr:      volumeMgr,
149
		cacher:         cacher,
150
	}
151
}
152

153
func newClient() rpc.Client {
154
	return rpc.NewClient(&rpc.Config{})
155
}
156

157
func TestService_New(t *testing.T) {
158
	// interface test
159
	seedBroker, leader := newBrokersWith2Responses(t)
160
	defer seedBroker.Close()
161
	defer leader.Close()
162
	cmcli := mock.ProxyMockClusterMgrCli(t)
163

164
	testCases := []struct {
165
		cfg Config
166
	}{
167
		// todo wait cm chang rpc
168
		{
169
			cfg: Config{
170
				MQ: MQConfig{
171
					BlobDeleteTopic:          "test1",
172
					ShardRepairTopic:         "test2",
173
					ShardRepairPriorityTopic: "test3",
174
					MsgSender: kafka.ProducerCfg{
175
						BrokerList: []string{seedBroker.Addr()},
176
						TimeoutMs:  1,
177
					},
178
				},
179
			},
180
		},
181
	}
182
	for _, tc := range testCases {
183
		New(tc.cfg, cmcli)
184
	}
185
}
186

187
func TestService_MQ(t *testing.T) {
188
	runMockService(newMockService(t))
189
	cli := newClient()
190

191
	deleteCases := []struct {
192
		args proxy.DeleteArgs
193
		code int
194
	}{
195
		{
196
			args: proxy.DeleteArgs{
197
				ClusterID: 1,
198
				Blobs:     []proxy.BlobDelete{{Bid: 0, Vid: 0}},
199
			},
200
			code: 200,
201
		},
202
		{
203
			args: proxy.DeleteArgs{
204
				ClusterID: 2,
205
				Blobs:     []proxy.BlobDelete{{Bid: 0, Vid: 0}},
206
			},
207
			code: 803,
208
		},
209
		{
210
			args: proxy.DeleteArgs{
211
				ClusterID: 1,
212
				Blobs:     []proxy.BlobDelete{{Bid: 0, Vid: 0}, {Bid: 1, Vid: 1}},
213
			},
214
			code: 500,
215
		},
216
	}
217
	for _, tc := range deleteCases {
218
		err := cli.PostWith(ctx, proxyServer.URL+"/deletemsg", nil, tc.args)
219
		require.Equal(t, tc.code, rpc.DetectStatusCode(err))
220
	}
221

222
	shardRepairCases := []struct {
223
		args proxy.ShardRepairArgs
224
		code int
225
	}{
226
		{
227
			args: proxy.ShardRepairArgs{
228
				ClusterID: 1,
229
				Bid:       1,
230
				Vid:       1,
231
				BadIdxes:  nil,
232
				Reason:    "",
233
			},
234
			code: 200,
235
		},
236
		{
237
			args: proxy.ShardRepairArgs{
238
				ClusterID: 2,
239
				Bid:       1,
240
				Vid:       1,
241
				BadIdxes:  nil,
242
				Reason:    "",
243
			},
244
			code: 803,
245
		},
246
		{
247
			args: proxy.ShardRepairArgs{
248
				ClusterID: 1,
249
				Bid:       1,
250
				Vid:       100,
251
				BadIdxes:  nil,
252
				Reason:    "",
253
			},
254
			code: 500,
255
		},
256
	}
257
	for _, tc := range shardRepairCases {
258
		err := cli.PostWith(ctx, proxyServer.URL+"/repairmsg", nil, tc.args)
259
		require.Equal(t, tc.code, rpc.DetectStatusCode(err))
260
	}
261
}
262

263
func TestService_Allocator(t *testing.T) {
264
	url := runMockService(newMockService(t))
265
	cli := newClient()
266
	allocURL := url + "/volume/alloc"
267
	{
268
		args := proxy.AllocVolsArgs{
269
			Fsize:    100,
270
			BidCount: 1,
271
			CodeMode: codemode.CodeMode(2),
272
		}
273

274
		err := cli.PostWith(ctx, allocURL, nil, args)
275
		require.NoError(t, err)
276
	}
277
	{
278
		args := proxy.AllocVolsArgs{
279
			Fsize:    100,
280
			BidCount: 1,
281
			CodeMode: codemode.CodeMode(3),
282
		}
283

284
		err := cli.PostWith(ctx, allocURL, nil, args)
285
		require.Error(t, err)
286
		require.Equal(t, errCodeMode.Error(), err.Error())
287
	}
288
	{
289
		args := proxy.AllocVolsArgs{
290
			Fsize:    100,
291
			BidCount: 10001,
292
			CodeMode: codemode.CodeMode(2),
293
		}
294
		err := cli.PostWith(ctx, allocURL, nil, args)
295
		require.Error(t, err)
296
		require.Equal(t, errBidCount.Error(), err.Error())
297
	}
298
	{
299
		args := proxy.AllocVolsArgs{
300
			Fsize:    100,
301
			BidCount: 0,
302
			CodeMode: codemode.CodeMode(2),
303
		}
304
		err := cli.PostWith(ctx, allocURL, nil, args)
305
		require.Error(t, err)
306
		require.Equal(t, errcode.ErrIllegalArguments.Error(), err.Error())
307
	}
308
	discardUrl := url + "/volume/discard"
309
	{
310
		args := proxy.DiscardVolsArgs{
311
			CodeMode: codemode.CodeMode(2),
312
			Discards: []proto.Vid{},
313
		}
314
		err := cli.PostWith(ctx, discardUrl, nil, args)
315
		require.NoError(t, err)
316
	}
317

318
	{
319
		err := cli.GetWith(ctx, url+"/volume/list?code_mode=0", nil)
320
		require.Error(t, err)
321
		require.Equal(t, errcode.ErrIllegalArguments.Error(), err.Error())
322
	}
323
	{
324
		err := cli.GetWith(ctx, url+"/volume/list?code_mode=3", nil)
325
		require.Error(t, err)
326
		require.Equal(t, errCodeMode.Error(), err.Error())
327
	}
328
	{
329
		err := cli.GetWith(ctx, url+"/volume/list?code_mode=2", nil)
330
		require.NoError(t, err)
331
	}
332
	{
333
		err := cli.GetWith(ctx, url+"/volume/list?code_mode=2", nil)
334
		require.NoError(t, err)
335
	}
336
}
337

338
func TestService_CacherVolume(t *testing.T) {
339
	url := runMockService(newMockService(t)) + "/cache/volume/"
340
	cli := newClient()
341
	var volume clustermgr.VolumeInfo
342
	{
343
		err := cli.GetWith(ctx, url+"1024", &volume)
344
		require.NoError(t, err)
345
		require.Equal(t, proto.Vid(1024), volume.Vid)
346
	}
347
	{
348
		err := cli.GetWith(ctx, url+"111", &volume)
349
		require.Error(t, err)
350
	}
351
	{
352
		err := cli.GetWith(ctx, url+"111?flush=0", &volume)
353
		require.Error(t, err)
354
		require.Equal(t, 500, rpc.DetectStatusCode(err))
355
	}
356
	{
357
		err := cli.GetWith(ctx, url+"111?flush=1", &volume)
358
		require.Error(t, err)
359
		require.Equal(t, errcode.CodeVolumeNotExist, rpc.DetectStatusCode(err))
360
	}
361
	{
362
		err := cli.GetWith(ctx, url+"111?flush=true", &volume)
363
		require.Error(t, err)
364
		require.Equal(t, errcode.CodeVolumeNotExist, rpc.DetectStatusCode(err))
365
	}
366
}
367

368
func TestService_CacherDisk(t *testing.T) {
369
	url := runMockService(newMockService(t)) + "/cache/disk/"
370
	cli := newClient()
371
	var disk blobnode.DiskInfo
372
	{
373
		err := cli.GetWith(ctx, url+"1024", &disk)
374
		require.NoError(t, err)
375
		require.Equal(t, proto.DiskID(1024), disk.DiskID)
376
	}
377
	{
378
		err := cli.GetWith(ctx, url+"111", nil)
379
		require.Error(t, err)
380
	}
381
	{
382
		err := cli.GetWith(ctx, url+"111?flush=0", nil)
383
		require.Error(t, err)
384
		require.Equal(t, 500, rpc.DetectStatusCode(err))
385
	}
386
	{
387
		err := cli.GetWith(ctx, url+"111?flush=true", nil)
388
		require.Error(t, err)
389
		require.Equal(t, errcode.CodeCMDiskNotFound, rpc.DetectStatusCode(err))
390
	}
391
}
392

393
func TestService_CacherErase(t *testing.T) {
394
	url := runMockService(newMockService(t)) + "/cache/erase/"
395
	cli := newClient()
396
	{
397
		resp, err := cli.Delete(ctx, url+"volume-10")
398
		require.NoError(t, err)
399
		require.Equal(t, 200, resp.StatusCode)
400
		resp.Body.Close()
401
	}
402
	{
403
		resp, err := cli.Delete(ctx, url+"ALL")
404
		require.NoError(t, err)
405
		require.Equal(t, 500, resp.StatusCode)
406
		resp.Body.Close()
407
	}
408
}
409

410
func TestConfigFix(t *testing.T) {
411
	testCases := []struct {
412
		cfg *Config
413
		err error
414
	}{
415
		{cfg: &Config{}, err: ErrIllegalTopic},
416
		{cfg: &Config{MQ: MQConfig{BlobDeleteTopic: "test"}}, err: ErrIllegalTopic},
417
		{cfg: &Config{MQ: MQConfig{BlobDeleteTopic: "test", ShardRepairTopic: "test1"}}, err: ErrIllegalTopic},
418
		{cfg: &Config{MQ: MQConfig{BlobDeleteTopic: "test", ShardRepairTopic: "test", ShardRepairPriorityTopic: "test3"}}, err: ErrIllegalTopic},
419
		{cfg: &Config{MQ: MQConfig{BlobDeleteTopic: "test", ShardRepairTopic: "test1", ShardRepairPriorityTopic: "test"}}, err: ErrIllegalTopic},
420
		{cfg: &Config{MQ: MQConfig{BlobDeleteTopic: "test", ShardRepairTopic: "test1", ShardRepairPriorityTopic: "test3"}}, err: nil},
421
	}
422

423
	for _, tc := range testCases {
424
		err := tc.cfg.checkAndFix()
425
		require.Equal(t, true, errors.Is(err, tc.err))
426
		tc.cfg.shardRepairCfg()
427
		tc.cfg.blobDeleteCfg()
428
	}
429
}
430

431
func newBrokersWith2Responses(t *testing.T) (*sarama.MockBroker, *sarama.MockBroker) {
432
	kafka.DefaultKafkaVersion = sarama.V0_9_0_1
433

434
	seedBroker := sarama.NewMockBrokerAddr(t, 1, "127.0.0.1:0")
435
	leader := sarama.NewMockBrokerAddr(t, 2, "127.0.0.1:0")
436

437
	metadataResponse := new(sarama.MetadataResponse)
438
	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
439
	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, 0)
440
	seedBroker.Returns(metadataResponse)
441
	seedBroker.Returns(metadataResponse)
442

443
	return seedBroker, leader
444
}
445

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.