kraken
191 строка · 5.5 Кб
1// Copyright (c) 2016-2019 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package blobserver
15
16import (
17"bytes"
18"testing"
19"time"
20
21"go.uber.org/zap"
22
23"github.com/andres-erbsen/clock"
24"github.com/golang/mock/gomock"
25"github.com/stretchr/testify/require"
26"github.com/uber-go/tally"
27
28"github.com/uber/kraken/core"
29"github.com/uber/kraken/lib/backend"
30"github.com/uber/kraken/lib/blobrefresh"
31"github.com/uber/kraken/lib/hashring"
32"github.com/uber/kraken/lib/healthcheck"
33"github.com/uber/kraken/lib/hostlist"
34"github.com/uber/kraken/lib/metainfogen"
35"github.com/uber/kraken/lib/store"
36"github.com/uber/kraken/mocks/lib/backend"
37"github.com/uber/kraken/mocks/lib/persistedretry"
38"github.com/uber/kraken/mocks/origin/blobclient"
39"github.com/uber/kraken/origin/blobclient"
40"github.com/uber/kraken/utils/log"
41"github.com/uber/kraken/utils/stringset"
42"github.com/uber/kraken/utils/testutil"
43)
44
45const (
46master1 = "dummy-origin-master01-zone2:80"
47master2 = "dummy-origin-master02-zone2:80"
48master3 = "dummy-origin-master03-zone2:80"
49)
50
51func init() {
52zapConfig := zap.NewProductionConfig()
53zapConfig.OutputPaths = []string{}
54log.ConfigureLogger(zapConfig)
55}
56
57func newHashRing(maxReplica int) hashring.Ring {
58return hashring.New(
59hashring.Config{MaxReplica: maxReplica},
60hostlist.Fixture(master1, master2, master3),
61healthcheck.IdentityFilter{})
62}
63
64func hashRingNoReplica() hashring.Ring { return newHashRing(1) }
65func hashRingSomeReplica() hashring.Ring { return newHashRing(2) }
66func hashRingMaxReplica() hashring.Ring { return newHashRing(3) }
67
68// testClientProvider implements blobclient.ClientProvider. It maps origin hostnames to
69// the local addresses they are running on, such that Provide("dummy-origin")
70// can resolve a real address.
71type testClientProvider struct {
72clients map[string]blobclient.Client
73}
74
75func newTestClientProvider() *testClientProvider {
76return &testClientProvider{make(map[string]blobclient.Client)}
77}
78
79func (p *testClientProvider) register(host string, client blobclient.Client) {
80p.clients[host] = client
81}
82
83func (p *testClientProvider) Provide(host string) blobclient.Client {
84c, ok := p.clients[host]
85if !ok {
86log.Panicf("host %q not found", host)
87}
88return c
89}
90
91// testServer is a convenience wrapper around the underlying components of a
92// Server and faciliates restarting Servers with new configuration.
93type testServer struct {
94ctrl *gomock.Controller
95host string
96addr string
97cas *store.CAStore
98cp *testClientProvider
99clusterProvider *mockblobclient.MockClusterProvider
100pctx core.PeerContext
101backendManager *backend.Manager
102writeBackManager *mockpersistedretry.MockManager
103clk *clock.Mock
104cleanup func()
105}
106
107func newTestServer(
108t *testing.T, host string, ring hashring.Ring, cp *testClientProvider) *testServer {
109
110var cleanup testutil.Cleanup
111defer cleanup.Recover()
112
113ctrl := gomock.NewController(t)
114cleanup.Add(ctrl.Finish)
115
116clusterProvider := mockblobclient.NewMockClusterProvider(ctrl)
117
118pctx := core.PeerContextFixture()
119
120cas, c := store.CAStoreFixture()
121cleanup.Add(c)
122
123bm := backend.ManagerFixture()
124
125writeBackManager := mockpersistedretry.NewMockManager(ctrl)
126
127mg := metainfogen.Fixture(cas, 4)
128
129br := blobrefresh.New(blobrefresh.Config{}, tally.NoopScope, cas, bm, mg)
130
131clk := clock.NewMock()
132clk.Set(time.Now())
133
134s, err := New(
135Config{}, tally.NoopScope, clk, host, ring, cas, cp, clusterProvider, pctx,
136bm, br, mg, writeBackManager)
137if err != nil {
138panic(err)
139}
140
141addr, stop := testutil.StartServer(s.Handler())
142cleanup.Add(stop)
143
144cp.register(host, blobclient.New(addr, blobclient.WithChunkSize(16)))
145
146return &testServer{
147ctrl: ctrl,
148host: host,
149addr: addr,
150cas: cas,
151cp: cp,
152clusterProvider: clusterProvider,
153pctx: pctx,
154backendManager: bm,
155writeBackManager: writeBackManager,
156clk: clk,
157cleanup: cleanup.Run,
158}
159}
160
161func (s *testServer) backendClient(namespace string) *mockbackend.MockClient {
162client := mockbackend.NewMockClient(s.ctrl)
163if err := s.backendManager.Register(namespace, client); err != nil {
164panic(err)
165}
166return client
167}
168
169func (s *testServer) expectRemoteCluster(dns string) *mockblobclient.MockClusterClient {
170cc := mockblobclient.NewMockClusterClient(s.ctrl)
171s.clusterProvider.EXPECT().Provide(dns).Return(cc, nil).MinTimes(1)
172return cc
173}
174
175// computeBlobForHosts generates a random digest / content which shards to hosts.
176func computeBlobForHosts(ring hashring.Ring, hosts ...string) *core.BlobFixture {
177want := stringset.New(hosts...)
178for {
179blob := core.SizedBlobFixture(32, 4)
180got := stringset.New(ring.Locations(blob.Digest)...)
181if stringset.Equal(want, got) {
182return blob
183}
184}
185}
186
187func ensureHasBlob(t *testing.T, c blobclient.Client, namespace string, blob *core.BlobFixture) {
188var buf bytes.Buffer
189require.NoError(t, c.DownloadBlob(namespace, blob.Digest, &buf))
190require.Equal(t, string(blob.Content), buf.String())
191}
192