cubefs

Форк
0
484 строки · 10.7 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package example
16

17
import (
18
	"bytes"
19
	"encoding/base64"
20
	"encoding/json"
21
	"errors"
22
	"fmt"
23
	"io"
24
	"net/http"
25
	"strconv"
26
	"sync"
27
	"time"
28

29
	"github.com/cubefs/cubefs/blobstore/common/rpc"
30
	"github.com/cubefs/cubefs/blobstore/common/rpc/auditlog"
31
	"github.com/cubefs/cubefs/blobstore/common/trace"
32
)
33

34
var (
35
	errExceed    = rpc.NewError(http.StatusNotAcceptable, "MaxFiles", errors.New("exceed max files"))
36
	errBadRequst = rpc.NewError(http.StatusBadRequest, "BadRequest", errors.New("bad request"))
37
	errForbidden = rpc.NewError(http.StatusForbidden, "Forbidden", errors.New("forbidden"))
38
	errNotFound  = rpc.NewError(http.StatusNotFound, "NotFound", errors.New("not found"))
39
)
40

41
// Mode file mode
42
type Mode uint8
43

44
// file mode
45
const (
46
	_ Mode = iota
47
	ModeRW
48
	ModeW
49
	ModeR
50
	ModeNone
51
)
52

53
// FileApp app handlers
54
type FileApp interface {
55
	Upload(*rpc.Context)
56
	Update(*rpc.Context)
57
	Delete(*rpc.Context)
58
	Download(*rpc.Context)
59
	Stream(*rpc.Context)
60
	Exist(*rpc.Context)
61
	Stat(*rpc.Context)
62
	List(*rpc.Context)
63
	OptionalArgs(*rpc.Context)
64
}
65

66
// AppConfig app configure
67
type AppConfig struct {
68
	MaxFiles int `json:"max_files"`
69

70
	SimpleConfig SimpleConfig `json:"simple_config"`
71
	LBConfig     LBConfig     `json:"lb_config"`
72
}
73

74
type fileStat struct {
75
	Name  string
76
	Size  int
77
	Mode  Mode
78
	Desc  []byte
79
	Ctime time.Time
80
	Mtime time.Time
81
	Meta  map[string]string
82
}
83

84
type app struct {
85
	mu    sync.RWMutex
86
	files map[string]*fileStat
87

88
	fileCli Client
89
	metaCli Client
90
	config  AppConfig
91
}
92

93
// NewApp new app
94
func NewApp(cfg AppConfig) FileApp {
95
	return &app{
96
		files:   make(map[string]*fileStat, cfg.MaxFiles),
97
		fileCli: NewFileClient(&cfg.SimpleConfig),
98
		metaCli: NewMetaClient(&cfg.LBConfig, nil),
99
		config:  cfg,
100
	}
101
}
102

103
// ArgsUpload args upload
104
// you should register the args to rpc, cos field name in tag
105
type ArgsUpload struct {
106
	Name    string      `flag:"name"`        // required
107
	Size    int         `flag:"size"`        // required
108
	Mode    Mode        `flag:"mode"`        // required
109
	Desc    []byte      `flag:"desc,base64"` // required, base64 urlencode string
110
	Nothing interface{} `flag:"-"`           // ignored
111
}
112

113
func (a *app) Upload(c *rpc.Context) {
114
	span := trace.SpanFromContextSafe(c.Request.Context())
115

116
	args := new(ArgsUpload)
117
	if err := c.ParseArgs(args); err != nil {
118
		c.RespondError(err)
119
		return
120
	}
121
	span.Debugf("receive upload request, args: %#v", args)
122

123
	if args.Name == "" || args.Mode > ModeNone {
124
		c.RespondError(errBadRequst)
125
		return
126
	}
127

128
	a.mu.RLock()
129
	if len(a.files) >= a.config.MaxFiles {
130
		a.mu.RUnlock()
131
		c.RespondError(errExceed)
132
		return
133
	}
134
	a.mu.RUnlock()
135

136
	// append module cost, u should add track log before c.Respond*
137
	startBody := time.Now().Add(-100 * time.Millisecond)
138
	if err := a.fileCli.Write(c.Request.Context(), args.Size, c.Request.Body); err != nil {
139
		span.AppendTrackLog("body", startBody, err)
140
		c.RespondError(rpc.NewError(http.StatusGone, "ReadBody", err))
141
		return
142
	}
143
	span.AppendTrackLog("body", startBody, nil)
144

145
	var metaData []byte = []byte("meta")
146
	// construct meta data, write by rpc to server
147
	if err := a.metaCli.Write(c.Request.Context(), len(metaData), bytes.NewReader(metaData)); err != nil {
148
		c.RespondError(rpc.NewError(http.StatusGone, "ReadMeta", err))
149
		return
150
	}
151

152
	a.mu.Lock()
153
	now := time.Now()
154
	a.files[args.Name] = &fileStat{
155
		Name:  args.Name,
156
		Size:  args.Size,
157
		Mode:  args.Mode,
158
		Desc:  args.Desc[:],
159
		Ctime: now,
160
		Mtime: now,
161
		Meta:  make(map[string]string),
162
	}
163
	a.mu.Unlock()
164

165
	c.Respond()
166
}
167

168
// ArgsUpdate args update
169
// args in body, you can implement rpc.Unmarshaler to unmarshal
170
type ArgsUpdate struct {
171
	Name string
172
	Desc []byte
173
	Meta map[string]string
174
}
175

176
func (a *app) Update(c *rpc.Context) {
177
	span := trace.SpanFromContextSafe(c.Request.Context())
178

179
	args := new(ArgsUpdate)
180
	if err := c.ArgsBody(args); err != nil {
181
		c.RespondError(err)
182
		return
183
	}
184
	span.Infof("update: %+v", args)
185

186
	if args.Name == "" {
187
		c.RespondError(errBadRequst)
188
		return
189
	}
190

191
	a.mu.Lock()
192
	defer a.mu.Unlock()
193
	file, ok := a.files[args.Name]
194
	if !ok {
195
		c.RespondStatus(http.StatusNotFound)
196
		return
197
	}
198

199
	for k, v := range args.Meta {
200
		file.Meta[k] = v
201
	}
202
	file.Desc = args.Desc[:]
203
	file.Mtime = time.Now()
204

205
	c.Respond()
206
}
207

208
// ArgsDelete args delete
209
// args in query string, the key in getter is lowercase of field name
210
type ArgsDelete struct {
211
	Name string // key == name
212
	Mode Mode   // key == mode
213
}
214

215
func (a *app) Delete(c *rpc.Context) {
216
	span := trace.SpanFromContextSafe(c.Request.Context())
217

218
	args := new(ArgsDelete)
219
	if err := c.ParseArgs(args); err != nil {
220
		c.RespondError(err)
221
		return
222
	}
223
	span.Infof("delete: %+v", args)
224

225
	if args.Name == "" {
226
		c.RespondError(errBadRequst)
227
		return
228
	}
229

230
	a.mu.Lock()
231
	defer a.mu.Unlock()
232

233
	file, ok := a.files[args.Name]
234
	if !ok {
235
		c.RespondStatus(http.StatusNoContent)
236
		return
237
	}
238

239
	if args.Mode > file.Mode {
240
		c.RespondError(errForbidden)
241
		return
242
	}
243

244
	delete(a.files, args.Name)
245
	c.Respond()
246
}
247

248
// ArgsDownload args download
249
// you can define mulits tag on fields, but need to specify
250
// tag name's order in RegisterArgsParser
251
type ArgsDownload struct {
252
	Name   string `form:"name"`
253
	Mode   Mode   `form:"mode"`
254
	Offset int    `formx:"offset,omitempty" flag:"xxx"`
255
	Read   int    `formx:"read,omitempty" flag:"vvv"`
256
}
257

258
func (a *app) Download(c *rpc.Context) {
259
	span := trace.SpanFromContextSafe(c.Request.Context())
260

261
	args := new(ArgsDownload)
262
	if err := c.ParseArgs(args); err != nil {
263
		c.RespondError(err)
264
		return
265
	}
266
	span.Debugf("receive download request, args: %#v", args)
267

268
	a.mu.RLock()
269
	file, ok := a.files[args.Name]
270
	a.mu.RUnlock()
271

272
	if args.Name == "" || args.Offset < 0 || args.Read < 0 ||
273
		args.Offset+args.Read > file.Size {
274
		c.RespondError(errBadRequst)
275
		return
276
	}
277

278
	if !ok {
279
		c.RespondError(errNotFound)
280
		return
281
	}
282

283
	if args.Mode > file.Mode {
284
		c.RespondError(errForbidden)
285
		return
286
	}
287

288
	if args.Read == 0 {
289
		c.Respond()
290
		return
291
	}
292

293
	span.AppendTrackLog("body", time.Now().Add(-1*time.Second), nil)
294

295
	reader, err := a.fileCli.Read(c.Request.Context(), args.Read)
296
	if err != nil {
297
		span.Error(err)
298
		c.RespondError(err)
299
		return
300
	}
301
	defer reader.Close()
302

303
	// Note: rpc server do not handler response write-error
304
	if args.Offset+args.Read < file.Size {
305
		c.RespondWithReader(http.StatusPartialContent, args.Read, rpc.MIMEStream,
306
			reader, map[string]string{
307
				rpc.HeaderContentRange: fmt.Sprintf("bytes %d-%d/%d",
308
					args.Offset, args.Offset+args.Read-1, file.Size),
309
			})
310
	} else {
311
		c.RespondWithReader(http.StatusOK, args.Read, rpc.MIMEStream, reader, nil)
312
	}
313
}
314

315
func (a *app) Stream(c *rpc.Context) {
316
	span := trace.SpanFromContextSafe(c.Request.Context())
317
	c.RespondStatus(http.StatusMultiStatus)
318
	if disconnected := c.Stream(func(w io.Writer) bool {
319
		reader, err := a.fileCli.Read(c.Request.Context(), 1<<30)
320
		if err != nil {
321
			span.Warn(err)
322
			return false
323
		}
324
		defer reader.Close()
325

326
		_, err = io.CopyN(w, reader, 1<<30)
327
		if err != nil {
328
			span.Warn(err)
329
			return false
330
		}
331
		return true
332
	}); disconnected {
333
		span.Warn("client gone")
334
	}
335
}
336

337
// ArgsExist args exist or not
338
type ArgsExist struct {
339
	Name string
340
}
341

342
// Parse implements rpc.Parser, parse args by yourself
343
func (args *ArgsExist) Parse(getter rpc.ValueGetter) error {
344
	name := getter("queryname")
345
	if name == "" {
346
		name = getter("name")
347
	}
348
	if name == "" {
349
		return errors.New("empty name")
350
	}
351
	args.Name = name
352
	return nil
353
}
354

355
var _ rpc.Parser = &ArgsExist{}
356

357
func (a *app) Exist(c *rpc.Context) {
358
	span := trace.SpanFromContextSafe(c.Request.Context())
359
	args := new(ArgsExist)
360
	if err := c.ParseArgs(args); err != nil {
361
		c.RespondError(err)
362
		return
363
	}
364
	span.Infof("exist: %+v", args)
365

366
	a.mu.RLock()
367
	file, ok := a.files[args.Name]
368
	a.mu.RUnlock()
369
	if !ok {
370
		c.RespondError(errNotFound)
371
		return
372
	}
373

374
	c.Writer.Header().Set("x-file-name", file.Name)
375
	c.Writer.Header().Set("x-file-size", strconv.Itoa(file.Size))
376
	c.Writer.Header().Set("x-file-mode", strconv.Itoa(int(file.Mode)))
377
	c.Writer.Header().Set("x-file-desc", base64.URLEncoding.EncodeToString(file.Desc))
378
	c.Writer.Header().Set("x-file-ctime", file.Ctime.Format(time.RFC1123))
379
	c.Writer.Header().Set("x-file-mtime", file.Mtime.Format(time.RFC1123))
380

381
	for k, v := range file.Meta {
382
		c.Writer.Header().Set("x-file-meta-"+k, v)
383
	}
384

385
	c.Respond()
386
}
387

388
// RespStat args stat, use json tag
389
type RespStat struct {
390
	Name  string `json:"name"`
391
	Size  int    `json:"size"`
392
	Mode  Mode
393
	Desc  []byte `json:"desc"`
394
	Ctime int64
395
	Mtime int64
396
	Meta  map[string]string
397
}
398

399
var _ rpc.Marshaler = &RespStat{}
400

401
// Marshal implements rpc.Marshaler, define you own marshaler
402
func (stat *RespStat) Marshal() ([]byte, string, error) {
403
	b, err := json.Marshal(stat)
404
	return b, "application/x-json", err
405
}
406

407
func (a *app) Stat(c *rpc.Context) {
408
	span := trace.SpanFromContextSafe(c.Request.Context())
409
	args := new(ArgsExist)
410
	if err := c.ParseArgs(args); err != nil {
411
		c.RespondError(err)
412
		return
413
	}
414
	span.Infof("stat: %+v", args)
415

416
	a.mu.RLock()
417
	defer a.mu.RUnlock()
418
	file, ok := a.files[args.Name]
419
	if !ok {
420
		c.RespondError(errNotFound)
421
		return
422
	}
423

424
	resp := RespStat{
425
		Name:  file.Name,
426
		Size:  file.Size,
427
		Mode:  file.Mode,
428
		Desc:  make([]byte, len(file.Desc)),
429
		Ctime: file.Ctime.Unix(),
430
		Mtime: file.Mtime.Unix(),
431
		Meta:  make(map[string]string, len(file.Meta)),
432
	}
433
	copy(resp.Desc, file.Desc)
434
	for k, v := range file.Meta {
435
		resp.Meta[k] = v
436
	}
437

438
	c.RespondJSON(resp)
439
}
440

441
func (a *app) List(c *rpc.Context) {
442
	a.mu.RLock()
443
	defer a.mu.RUnlock()
444

445
	files := make([]RespStat, 0, len(a.files))
446
	for _, file := range a.files {
447
		resp := RespStat{
448
			Name:  file.Name,
449
			Size:  file.Size,
450
			Mode:  file.Mode,
451
			Desc:  make([]byte, len(file.Desc)),
452
			Ctime: file.Ctime.Unix(),
453
			Mtime: file.Mtime.Unix(),
454
			Meta:  make(map[string]string, len(file.Meta)),
455
		}
456
		copy(resp.Desc, file.Desc)
457
		for k, v := range file.Meta {
458
			resp.Meta[k] = v
459
		}
460

461
		files = append(files, resp)
462
	}
463

464
	extraHeader := auditlog.ExtraHeader(c.Writer)
465
	extraHeader.Add("http-header-key", "http-header-value")
466
	extraHeader["oWN-defined-KEY"] = []string{"ODvalue", "0x10"}
467

468
	c.RespondStatusData(http.StatusOK, files)
469
}
470

471
// ArgsURIOptional argument in uri with omitempty
472
type ArgsURIOptional struct {
473
	Require string `json:"require"`
474
	Option  string `json:"option,omitempty"`
475
}
476

477
func (a *app) OptionalArgs(c *rpc.Context) {
478
	args := new(ArgsURIOptional)
479
	if err := c.ParseArgs(args); err != nil {
480
		c.RespondError(err)
481
		return
482
	}
483
	c.RespondJSON(args)
484
}
485

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.