kraken
234 строки · 6.6 Кб
1// Copyright (c) 2016-2019 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package agentserver
15
16import (
17"context"
18"encoding/json"
19"fmt"
20"io"
21"net/http"
22_ "net/http/pprof" // Registers /debug/pprof endpoints in http.DefaultServeMux.
23"os"
24"strings"
25
26"github.com/uber/kraken/build-index/tagclient"
27"github.com/uber/kraken/core"
28"github.com/uber/kraken/lib/containerruntime"
29"github.com/uber/kraken/lib/middleware"
30"github.com/uber/kraken/lib/store"
31"github.com/uber/kraken/lib/torrent/scheduler"
32"github.com/uber/kraken/utils/handler"
33"github.com/uber/kraken/utils/httputil"
34
35"github.com/go-chi/chi"
36"github.com/uber-go/tally"
37)
38
39// Config defines Server configuration.
40type Config struct{}
41
42// Server defines the agent HTTP server.
43type Server struct {
44config Config
45stats tally.Scope
46cads *store.CADownloadStore
47sched scheduler.ReloadableScheduler
48tags tagclient.Client
49containerRuntime containerruntime.Factory
50}
51
52// New creates a new Server.
53func New(
54config Config,
55stats tally.Scope,
56cads *store.CADownloadStore,
57sched scheduler.ReloadableScheduler,
58tags tagclient.Client,
59containerRuntime containerruntime.Factory) *Server {
60
61stats = stats.Tagged(map[string]string{
62"module": "agentserver",
63})
64
65return &Server{config, stats, cads, sched, tags, containerRuntime}
66}
67
68// Handler returns the HTTP handler.
69func (s *Server) Handler() http.Handler {
70r := chi.NewRouter()
71
72r.Use(middleware.StatusCounter(s.stats))
73r.Use(middleware.LatencyTimer(s.stats))
74
75r.Get("/health", handler.Wrap(s.healthHandler))
76
77r.Get("/tags/{tag}", handler.Wrap(s.getTagHandler))
78
79r.Get("/namespace/{namespace}/blobs/{digest}", handler.Wrap(s.downloadBlobHandler))
80
81r.Delete("/blobs/{digest}", handler.Wrap(s.deleteBlobHandler))
82
83// Preheat/preload endpoints.
84r.Get("/preload/tags/{tag}", handler.Wrap(s.preloadTagHandler))
85
86// Dangerous endpoint for running experiments.
87r.Patch("/x/config/scheduler", handler.Wrap(s.patchSchedulerConfigHandler))
88
89r.Get("/x/blacklist", handler.Wrap(s.getBlacklistHandler))
90
91// Serves /debug/pprof endpoints.
92r.Mount("/", http.DefaultServeMux)
93
94return r
95}
96
97// getTagHandler proxies get tag requests to the build-index.
98func (s *Server) getTagHandler(w http.ResponseWriter, r *http.Request) error {
99tag, err := httputil.ParseParam(r, "tag")
100if err != nil {
101return err
102}
103d, err := s.tags.Get(tag)
104if err != nil {
105if err == tagclient.ErrTagNotFound {
106return handler.ErrorStatus(http.StatusNotFound)
107}
108return handler.Errorf("get tag: %s", err)
109}
110io.WriteString(w, d.String())
111return nil
112}
113
114// downloadBlobHandler downloads a blob through p2p.
115func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) error {
116namespace, err := httputil.ParseParam(r, "namespace")
117if err != nil {
118return err
119}
120d, err := parseDigest(r)
121if err != nil {
122return err
123}
124f, err := s.cads.Cache().GetFileReader(d.Hex())
125if err != nil {
126if os.IsNotExist(err) || s.cads.InDownloadError(err) {
127if err := s.sched.Download(namespace, d); err != nil {
128if err == scheduler.ErrTorrentNotFound {
129return handler.ErrorStatus(http.StatusNotFound)
130}
131return handler.Errorf("download torrent: %s", err)
132}
133f, err = s.cads.Cache().GetFileReader(d.Hex())
134if err != nil {
135return handler.Errorf("store: %s", err)
136}
137} else {
138return handler.Errorf("store: %s", err)
139}
140}
141if _, err := io.Copy(w, f); err != nil {
142return fmt.Errorf("copy file: %s", err)
143}
144return nil
145}
146
147func (s *Server) deleteBlobHandler(w http.ResponseWriter, r *http.Request) error {
148d, err := parseDigest(r)
149if err != nil {
150return err
151}
152if err := s.sched.RemoveTorrent(d); err != nil {
153return handler.Errorf("remove torrent: %s", err)
154}
155return nil
156}
157
158// preloadTagHandler triggers docker daemon to download specified docker image.
159func (s *Server) preloadTagHandler(w http.ResponseWriter, r *http.Request) error {
160tag, err := httputil.ParseParam(r, "tag")
161if err != nil {
162return err
163}
164parts := strings.Split(tag, ":")
165if len(parts) != 2 {
166return handler.Errorf("failed to parse docker image tag")
167}
168repo, tag := parts[0], parts[1]
169
170rt := httputil.GetQueryArg(r, "runtime", "docker")
171ns := httputil.GetQueryArg(r, "namespace", "")
172switch rt {
173case "docker":
174if err := s.containerRuntime.DockerClient().
175PullImage(context.Background(), repo, tag); err != nil {
176return handler.Errorf("docker pull: %s", err)
177}
178case "containerd":
179if err := s.containerRuntime.ContainerdClient().
180PullImage(context.Background(), ns, repo, tag); err != nil {
181return handler.Errorf("containerd pull: %s", err)
182}
183default:
184return handler.Errorf("unsupported container runtime")
185}
186return nil
187}
188
189func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) error {
190if err := s.sched.Probe(); err != nil {
191return handler.Errorf("probe torrent client: %s", err)
192}
193io.WriteString(w, "OK")
194return nil
195}
196
197// patchSchedulerConfigHandler restarts the agent torrent scheduler with
198// the config in request body.
199func (s *Server) patchSchedulerConfigHandler(w http.ResponseWriter, r *http.Request) error {
200defer r.Body.Close()
201var config scheduler.Config
202if err := json.NewDecoder(r.Body).Decode(&config); err != nil {
203return handler.Errorf("json decode: %s", err).Status(http.StatusBadRequest)
204}
205s.sched.Reload(config)
206return nil
207}
208
209func (s *Server) getBlacklistHandler(w http.ResponseWriter, r *http.Request) error {
210blacklist, err := s.sched.BlacklistSnapshot()
211if err != nil {
212return handler.Errorf("blacklist snapshot: %s", err)
213}
214if err := json.NewEncoder(w).Encode(&blacklist); err != nil {
215return handler.Errorf("json encode: %s", err)
216}
217return nil
218}
219
220func parseDigest(r *http.Request) (core.Digest, error) {
221raw, err := httputil.ParseParam(r, "digest")
222if err != nil {
223return core.Digest{}, err
224}
225// TODO(codyg): Accept only a fully formed digest.
226d, err := core.NewSHA256DigestFromHex(raw)
227if err != nil {
228d, err = core.ParseSHA256Digest(raw)
229if err != nil {
230return core.Digest{}, handler.Errorf("parse digest: %s", err).Status(http.StatusBadRequest)
231}
232}
233return d, nil
234}
235