kraken

Форк
0
234 строки · 6.6 Кб
1
// Copyright (c) 2016-2019 Uber Technologies, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
package agentserver
15

16
import (
17
	"context"
18
	"encoding/json"
19
	"fmt"
20
	"io"
21
	"net/http"
22
	_ "net/http/pprof" // Registers /debug/pprof endpoints in http.DefaultServeMux.
23
	"os"
24
	"strings"
25

26
	"github.com/uber/kraken/build-index/tagclient"
27
	"github.com/uber/kraken/core"
28
	"github.com/uber/kraken/lib/containerruntime"
29
	"github.com/uber/kraken/lib/middleware"
30
	"github.com/uber/kraken/lib/store"
31
	"github.com/uber/kraken/lib/torrent/scheduler"
32
	"github.com/uber/kraken/utils/handler"
33
	"github.com/uber/kraken/utils/httputil"
34

35
	"github.com/go-chi/chi"
36
	"github.com/uber-go/tally"
37
)
38

39
// Config defines Server configuration.
40
type Config struct{}
41

42
// Server defines the agent HTTP server.
43
type Server struct {
44
	config           Config
45
	stats            tally.Scope
46
	cads             *store.CADownloadStore
47
	sched            scheduler.ReloadableScheduler
48
	tags             tagclient.Client
49
	containerRuntime containerruntime.Factory
50
}
51

52
// New creates a new Server.
53
func New(
54
	config Config,
55
	stats tally.Scope,
56
	cads *store.CADownloadStore,
57
	sched scheduler.ReloadableScheduler,
58
	tags tagclient.Client,
59
	containerRuntime containerruntime.Factory) *Server {
60

61
	stats = stats.Tagged(map[string]string{
62
		"module": "agentserver",
63
	})
64

65
	return &Server{config, stats, cads, sched, tags, containerRuntime}
66
}
67

68
// Handler returns the HTTP handler.
69
func (s *Server) Handler() http.Handler {
70
	r := chi.NewRouter()
71

72
	r.Use(middleware.StatusCounter(s.stats))
73
	r.Use(middleware.LatencyTimer(s.stats))
74

75
	r.Get("/health", handler.Wrap(s.healthHandler))
76

77
	r.Get("/tags/{tag}", handler.Wrap(s.getTagHandler))
78

79
	r.Get("/namespace/{namespace}/blobs/{digest}", handler.Wrap(s.downloadBlobHandler))
80

81
	r.Delete("/blobs/{digest}", handler.Wrap(s.deleteBlobHandler))
82

83
	// Preheat/preload endpoints.
84
	r.Get("/preload/tags/{tag}", handler.Wrap(s.preloadTagHandler))
85

86
	// Dangerous endpoint for running experiments.
87
	r.Patch("/x/config/scheduler", handler.Wrap(s.patchSchedulerConfigHandler))
88

89
	r.Get("/x/blacklist", handler.Wrap(s.getBlacklistHandler))
90

91
	// Serves /debug/pprof endpoints.
92
	r.Mount("/", http.DefaultServeMux)
93

94
	return r
95
}
96

97
// getTagHandler proxies get tag requests to the build-index.
98
func (s *Server) getTagHandler(w http.ResponseWriter, r *http.Request) error {
99
	tag, err := httputil.ParseParam(r, "tag")
100
	if err != nil {
101
		return err
102
	}
103
	d, err := s.tags.Get(tag)
104
	if err != nil {
105
		if err == tagclient.ErrTagNotFound {
106
			return handler.ErrorStatus(http.StatusNotFound)
107
		}
108
		return handler.Errorf("get tag: %s", err)
109
	}
110
	io.WriteString(w, d.String())
111
	return nil
112
}
113

114
// downloadBlobHandler downloads a blob through p2p.
115
func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) error {
116
	namespace, err := httputil.ParseParam(r, "namespace")
117
	if err != nil {
118
		return err
119
	}
120
	d, err := parseDigest(r)
121
	if err != nil {
122
		return err
123
	}
124
	f, err := s.cads.Cache().GetFileReader(d.Hex())
125
	if err != nil {
126
		if os.IsNotExist(err) || s.cads.InDownloadError(err) {
127
			if err := s.sched.Download(namespace, d); err != nil {
128
				if err == scheduler.ErrTorrentNotFound {
129
					return handler.ErrorStatus(http.StatusNotFound)
130
				}
131
				return handler.Errorf("download torrent: %s", err)
132
			}
133
			f, err = s.cads.Cache().GetFileReader(d.Hex())
134
			if err != nil {
135
				return handler.Errorf("store: %s", err)
136
			}
137
		} else {
138
			return handler.Errorf("store: %s", err)
139
		}
140
	}
141
	if _, err := io.Copy(w, f); err != nil {
142
		return fmt.Errorf("copy file: %s", err)
143
	}
144
	return nil
145
}
146

147
func (s *Server) deleteBlobHandler(w http.ResponseWriter, r *http.Request) error {
148
	d, err := parseDigest(r)
149
	if err != nil {
150
		return err
151
	}
152
	if err := s.sched.RemoveTorrent(d); err != nil {
153
		return handler.Errorf("remove torrent: %s", err)
154
	}
155
	return nil
156
}
157

158
// preloadTagHandler triggers docker daemon to download specified docker image.
159
func (s *Server) preloadTagHandler(w http.ResponseWriter, r *http.Request) error {
160
	tag, err := httputil.ParseParam(r, "tag")
161
	if err != nil {
162
		return err
163
	}
164
	parts := strings.Split(tag, ":")
165
	if len(parts) != 2 {
166
		return handler.Errorf("failed to parse docker image tag")
167
	}
168
	repo, tag := parts[0], parts[1]
169

170
	rt := httputil.GetQueryArg(r, "runtime", "docker")
171
	ns := httputil.GetQueryArg(r, "namespace", "")
172
	switch rt {
173
	case "docker":
174
		if err := s.containerRuntime.DockerClient().
175
			PullImage(context.Background(), repo, tag); err != nil {
176
			return handler.Errorf("docker pull: %s", err)
177
		}
178
	case "containerd":
179
		if err := s.containerRuntime.ContainerdClient().
180
			PullImage(context.Background(), ns, repo, tag); err != nil {
181
			return handler.Errorf("containerd pull: %s", err)
182
		}
183
	default:
184
		return handler.Errorf("unsupported container runtime")
185
	}
186
	return nil
187
}
188

189
func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) error {
190
	if err := s.sched.Probe(); err != nil {
191
		return handler.Errorf("probe torrent client: %s", err)
192
	}
193
	io.WriteString(w, "OK")
194
	return nil
195
}
196

197
// patchSchedulerConfigHandler restarts the agent torrent scheduler with
198
// the config in request body.
199
func (s *Server) patchSchedulerConfigHandler(w http.ResponseWriter, r *http.Request) error {
200
	defer r.Body.Close()
201
	var config scheduler.Config
202
	if err := json.NewDecoder(r.Body).Decode(&config); err != nil {
203
		return handler.Errorf("json decode: %s", err).Status(http.StatusBadRequest)
204
	}
205
	s.sched.Reload(config)
206
	return nil
207
}
208

209
func (s *Server) getBlacklistHandler(w http.ResponseWriter, r *http.Request) error {
210
	blacklist, err := s.sched.BlacklistSnapshot()
211
	if err != nil {
212
		return handler.Errorf("blacklist snapshot: %s", err)
213
	}
214
	if err := json.NewEncoder(w).Encode(&blacklist); err != nil {
215
		return handler.Errorf("json encode: %s", err)
216
	}
217
	return nil
218
}
219

220
func parseDigest(r *http.Request) (core.Digest, error) {
221
	raw, err := httputil.ParseParam(r, "digest")
222
	if err != nil {
223
		return core.Digest{}, err
224
	}
225
	// TODO(codyg): Accept only a fully formed digest.
226
	d, err := core.NewSHA256DigestFromHex(raw)
227
	if err != nil {
228
		d, err = core.ParseSHA256Digest(raw)
229
		if err != nil {
230
			return core.Digest{}, handler.Errorf("parse digest: %s", err).Status(http.StatusBadRequest)
231
		}
232
	}
233
	return d, nil
234
}
235

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.