kraken
85 строк · 2.4 Кб
1// Copyright (c) 2016-2019 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package blobserver
15
16import (
17"io"
18"net/http"
19"os"
20
21"github.com/docker/distribution/uuid"
22"github.com/uber/kraken/core"
23"github.com/uber/kraken/lib/store"
24"github.com/uber/kraken/utils/handler"
25)
26
27// uploader executes a chunked upload.
28type uploader struct {
29cas *store.CAStore
30}
31
32func newUploader(cas *store.CAStore) *uploader {
33return &uploader{cas}
34}
35
36func (u *uploader) start(d core.Digest) (uid string, err error) {
37if ok, err := blobExists(u.cas, d); err != nil {
38return "", err
39} else if ok {
40return "", handler.ErrorStatus(http.StatusConflict)
41}
42uid = uuid.Generate().String()
43if err := u.cas.CreateUploadFile(uid, 0); err != nil {
44return "", handler.Errorf("create upload file: %s", err)
45}
46return uid, nil
47}
48
49func (u *uploader) patch(
50d core.Digest, uid string, chunk io.Reader, start, end int64) error {
51
52if ok, err := blobExists(u.cas, d); err != nil {
53return err
54} else if ok {
55return handler.ErrorStatus(http.StatusConflict)
56}
57f, err := u.cas.GetUploadFileReadWriter(uid)
58if err != nil {
59if os.IsNotExist(err) {
60return handler.ErrorStatus(http.StatusNotFound)
61}
62return handler.Errorf("get upload file: %s", err)
63}
64defer f.Close()
65if _, err := f.Seek(start, 0); err != nil {
66return handler.Errorf("seek offset %d: %s", start, err).Status(http.StatusBadRequest)
67}
68if _, err := io.CopyN(f, chunk, end-start); err != nil {
69return handler.Errorf("copy: %s", err)
70}
71return nil
72}
73
74func (u *uploader) commit(d core.Digest, uid string) error {
75if err := u.cas.MoveUploadFileToCache(uid, d.Hex()); err != nil {
76if os.IsNotExist(err) {
77return handler.ErrorStatus(http.StatusNotFound)
78}
79if os.IsExist(err) {
80return handler.ErrorStatus(http.StatusConflict)
81}
82return handler.Errorf("move upload file to cache: %s", err)
83}
84return nil
85}
86