kraken
326 строк · 8.3 Кб
1// Copyright (c) 2016-2019 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package agentserver
15
16import (
17"bytes"
18"context"
19"encoding/json"
20"errors"
21"fmt"
22"io/ioutil"
23"net/url"
24"testing"
25"time"
26
27"github.com/uber/kraken/agent/agentclient"
28"github.com/uber/kraken/build-index/tagclient"
29"github.com/uber/kraken/core"
30"github.com/uber/kraken/lib/store"
31"github.com/uber/kraken/lib/torrent/scheduler"
32"github.com/uber/kraken/lib/torrent/scheduler/connstate"
33mocktagclient "github.com/uber/kraken/mocks/build-index/tagclient"
34mockcontainerruntime "github.com/uber/kraken/mocks/lib/containerruntime"
35mockcontainerd "github.com/uber/kraken/mocks/lib/containerruntime/containerd"
36mockdockerdaemon "github.com/uber/kraken/mocks/lib/containerruntime/dockerdaemon"
37mockscheduler "github.com/uber/kraken/mocks/lib/torrent/scheduler"
38"github.com/uber/kraken/utils/httputil"
39"github.com/uber/kraken/utils/testutil"
40
41"github.com/golang/mock/gomock"
42"github.com/stretchr/testify/require"
43"github.com/uber-go/tally"
44)
45
46type serverMocks struct {
47cads *store.CADownloadStore
48sched *mockscheduler.MockReloadableScheduler
49tags *mocktagclient.MockClient
50dockerCli *mockdockerdaemon.MockDockerClient
51containerdCli *mockcontainerd.MockClient
52containerRuntime *mockcontainerruntime.MockFactory
53cleanup *testutil.Cleanup
54}
55
56func newServerMocks(t *testing.T) (*serverMocks, func()) {
57var cleanup testutil.Cleanup
58
59cads, c := store.CADownloadStoreFixture()
60cleanup.Add(c)
61
62ctrl := gomock.NewController(t)
63cleanup.Add(ctrl.Finish)
64
65sched := mockscheduler.NewMockReloadableScheduler(ctrl)
66
67tags := mocktagclient.NewMockClient(ctrl)
68
69dockerCli := mockdockerdaemon.NewMockDockerClient(ctrl)
70containerdCli := mockcontainerd.NewMockClient(ctrl)
71containerruntime := mockcontainerruntime.NewMockFactory(ctrl)
72return &serverMocks{
73cads, sched, tags, dockerCli, containerdCli,
74containerruntime, &cleanup}, cleanup.Run
75}
76
77func (m *serverMocks) startServer() string {
78s := New(Config{}, tally.NoopScope, m.cads, m.sched, m.tags, m.containerRuntime)
79addr, stop := testutil.StartServer(s.Handler())
80m.cleanup.Add(stop)
81return addr
82}
83
84func TestGetTag(t *testing.T) {
85require := require.New(t)
86
87mocks, cleanup := newServerMocks(t)
88defer cleanup()
89
90tag := core.TagFixture()
91d := core.DigestFixture()
92
93mocks.tags.EXPECT().Get(tag).Return(d, nil)
94
95c := agentclient.New(mocks.startServer())
96
97result, err := c.GetTag(tag)
98require.NoError(err)
99require.Equal(d, result)
100}
101
102func TestGetTagNotFound(t *testing.T) {
103require := require.New(t)
104
105mocks, cleanup := newServerMocks(t)
106defer cleanup()
107
108tag := core.TagFixture()
109
110mocks.tags.EXPECT().Get(tag).Return(core.Digest{}, tagclient.ErrTagNotFound)
111
112c := agentclient.New(mocks.startServer())
113
114_, err := c.GetTag(tag)
115require.Error(err)
116require.Equal(agentclient.ErrTagNotFound, err)
117}
118
119func TestDownload(t *testing.T) {
120require := require.New(t)
121
122mocks, cleanup := newServerMocks(t)
123defer cleanup()
124
125namespace := core.TagFixture()
126blob := core.NewBlobFixture()
127
128mocks.sched.EXPECT().Download(namespace, blob.Digest).DoAndReturn(
129func(namespace string, d core.Digest) error {
130return store.RunDownload(mocks.cads, d, blob.Content)
131})
132
133addr := mocks.startServer()
134c := agentclient.New(addr)
135
136r, err := c.Download(namespace, blob.Digest)
137require.NoError(err)
138result, err := ioutil.ReadAll(r)
139require.NoError(err)
140require.Equal(string(blob.Content), string(result))
141}
142
143func TestDownloadNotFound(t *testing.T) {
144require := require.New(t)
145
146mocks, cleanup := newServerMocks(t)
147defer cleanup()
148
149namespace := core.TagFixture()
150blob := core.NewBlobFixture()
151
152mocks.sched.EXPECT().Download(namespace, blob.Digest).Return(scheduler.ErrTorrentNotFound)
153
154addr := mocks.startServer()
155c := agentclient.New(addr)
156
157_, err := c.Download(namespace, blob.Digest)
158require.Error(err)
159require.True(httputil.IsNotFound(err))
160}
161
162func TestDownloadUnknownError(t *testing.T) {
163require := require.New(t)
164
165mocks, cleanup := newServerMocks(t)
166defer cleanup()
167
168namespace := core.TagFixture()
169blob := core.NewBlobFixture()
170
171mocks.sched.EXPECT().Download(namespace, blob.Digest).Return(fmt.Errorf("test error"))
172
173addr := mocks.startServer()
174c := agentclient.New(addr)
175
176_, err := c.Download(namespace, blob.Digest)
177require.Error(err)
178require.True(httputil.IsStatus(err, 500))
179}
180
181func TestHealthHandler(t *testing.T) {
182tests := []struct {
183desc string
184probeErr error
185}{
186{"probe error", errors.New("some probe error")},
187{"healthy", nil},
188}
189for _, test := range tests {
190t.Run(test.desc, func(t *testing.T) {
191require := require.New(t)
192
193mocks, cleanup := newServerMocks(t)
194defer cleanup()
195
196mocks.sched.EXPECT().Probe().Return(test.probeErr)
197
198addr := mocks.startServer()
199
200_, err := httputil.Get(fmt.Sprintf("http://%s/health", addr))
201if test.probeErr != nil {
202require.Error(err)
203} else {
204require.NoError(err)
205}
206})
207}
208}
209
210func TestPatchSchedulerConfigHandler(t *testing.T) {
211require := require.New(t)
212
213mocks, cleanup := newServerMocks(t)
214defer cleanup()
215
216addr := mocks.startServer()
217
218config := scheduler.Config{
219ConnTTI: time.Minute,
220}
221b, err := json.Marshal(config)
222require.NoError(err)
223
224mocks.sched.EXPECT().Reload(config)
225
226_, err = httputil.Patch(
227fmt.Sprintf("http://%s/x/config/scheduler", addr),
228httputil.SendBody(bytes.NewReader(b)))
229require.NoError(err)
230}
231
232func TestGetBlacklistHandler(t *testing.T) {
233require := require.New(t)
234
235mocks, cleanup := newServerMocks(t)
236defer cleanup()
237
238blacklist := []connstate.BlacklistedConn{{
239PeerID: core.PeerIDFixture(),
240InfoHash: core.InfoHashFixture(),
241Remaining: time.Second,
242}}
243mocks.sched.EXPECT().BlacklistSnapshot().Return(blacklist, nil)
244
245addr := mocks.startServer()
246
247resp, err := httputil.Get(fmt.Sprintf("http://%s/x/blacklist", addr))
248require.NoError(err)
249
250var result []connstate.BlacklistedConn
251require.NoError(json.NewDecoder(resp.Body).Decode(&result))
252require.Equal(blacklist, result)
253}
254
255func TestDeleteBlobHandler(t *testing.T) {
256require := require.New(t)
257
258mocks, cleanup := newServerMocks(t)
259defer cleanup()
260
261d := core.DigestFixture()
262
263addr := mocks.startServer()
264
265mocks.sched.EXPECT().RemoveTorrent(d).Return(nil)
266
267_, err := httputil.Delete(fmt.Sprintf("http://%s/blobs/%s", addr, d))
268require.NoError(err)
269}
270
271func TestPreloadHandler(t *testing.T) {
272tag := url.PathEscape("repo1:tag1")
273tests := []struct {
274name string
275url string
276setup func(*serverMocks)
277expectedError string
278}{
279{
280name: "success docker",
281url: fmt.Sprintf("/preload/tags/%s", tag),
282setup: func(mocks *serverMocks) {
283mocks.dockerCli.EXPECT().
284PullImage(context.Background(), "repo1", "tag1").Return(nil)
285mocks.containerRuntime.EXPECT().
286DockerClient().Return(mocks.dockerCli)
287},
288},
289{
290name: "success containerd",
291url: fmt.Sprintf("/preload/tags/%s?runtime=containerd&namespace=name.space1", tag),
292setup: func(mocks *serverMocks) {
293mocks.containerdCli.EXPECT().
294PullImage(context.Background(), "name.space1", "repo1", "tag1").Return(nil)
295mocks.containerRuntime.EXPECT().
296ContainerdClient().Return(mocks.containerdCli)
297},
298},
299{
300name: "unsupported runtime",
301url: fmt.Sprintf("/preload/tags/%s?runtime=crio", tag),
302setup: func(_ *serverMocks) {},
303expectedError: "/preload/tags/repo1:tag1?runtime=crio 500: unsupported container runtime",
304},
305}
306
307for _, tt := range tests {
308t.Run(tt.name, func(t *testing.T) {
309require := require.New(t)
310
311mocks, cleanup := newServerMocks(t)
312defer cleanup()
313
314tt.setup(mocks)
315addr := mocks.startServer()
316
317_, err := httputil.Get(fmt.Sprintf("http://%s%s", addr, tt.url))
318if tt.expectedError != "" {
319require.EqualError(err,
320fmt.Sprintf("GET http://%s%s", addr, tt.expectedError))
321} else {
322require.NoError(err)
323}
324})
325}
326}
327