1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
23
"github.com/cubefs/cubefs/blobstore/common/rpc"
26
// Client read and write client
27
type Client interface {
28
Write(context.Context, int, io.Reader) error
29
Read(context.Context, int) (io.ReadCloser, error)
32
// SimpleConfig simple client config
33
type SimpleConfig struct {
34
Host string `json:"host"`
35
Config rpc.Config `json:"rpc_config"`
38
// NewFileClient returns file client
39
func NewFileClient(conf *SimpleConfig) Client {
40
return &fileClient{conf.Host, rpc.NewClient(&conf.Config)}
43
type fileClient struct {
48
func (fc *fileClient) Write(ctx context.Context, size int, body io.Reader) (err error) {
49
request, err := http.NewRequest(http.MethodPut, fmt.Sprintf("%s/write/%d", fc.host, size), body)
53
return fc.DoWith(ctx, request, nil, rpc.WithCrcEncode())
56
func (fc *fileClient) Read(ctx context.Context, size int) (io.ReadCloser, error) {
57
resp, err := fc.Get(ctx, fmt.Sprintf("%s/read/%d", fc.host, size))
61
if resp.StatusCode != http.StatusOK {
62
return nil, fmt.Errorf("invalid httpcode %d", resp.StatusCode)
67
// LBConfig lb client config
69
Config rpc.LbConfig `json:"rpc_lb_config"`
72
// NewMetaClient returns meta client
73
func NewMetaClient(conf *LBConfig, selector rpc.Selector) Client {
74
return &metaClient{rpc.NewLbClient(&conf.Config, selector)}
77
type metaClient struct {
81
func (fc *metaClient) Write(ctx context.Context, size int, body io.Reader) (err error) {
82
request, err := http.NewRequest(http.MethodPut, fmt.Sprintf("/write/%d", size), body)
86
resp, err := fc.Do(ctx, request)
94
func (fc *metaClient) Read(ctx context.Context, size int) (io.ReadCloser, error) {
95
resp, err := fc.Get(ctx, fmt.Sprintf("/read/%d", size))
99
if resp.StatusCode != http.StatusOK {
100
return nil, fmt.Errorf("invalid httpcode %d", resp.StatusCode)
102
return resp.Body, nil