kuma

Форк
0
165 строк · 4.3 Кб
1
package stream
2

3
import (
4
	"context"
5
	"crypto/tls"
6
	"encoding/json"
7
	"fmt"
8
	"net/url"
9

10
	envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
11
	envoy_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
12
	"github.com/pkg/errors"
13
	"google.golang.org/genproto/googleapis/rpc/status"
14
	"google.golang.org/grpc"
15
	"google.golang.org/grpc/credentials"
16
	"google.golang.org/grpc/credentials/insecure"
17
	"google.golang.org/grpc/metadata"
18
	"google.golang.org/protobuf/types/known/structpb"
19

20
	mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
21
	"github.com/kumahq/kuma/pkg/core/resources/model/rest"
22
	util_proto "github.com/kumahq/kuma/pkg/util/proto"
23
)
24

25
type Client struct {
26
	conn   *grpc.ClientConn
27
	client envoy_discovery.AggregatedDiscoveryServiceClient
28
}
29

30
type Stream struct {
31
	stream         envoy_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient
32
	latestACKed    map[string]*envoy_discovery.DiscoveryResponse
33
	latestReceived map[string]*envoy_discovery.DiscoveryResponse
34
}
35

36
func New(serverURL string) (*Client, error) {
37
	url, err := url.Parse(serverURL)
38
	if err != nil {
39
		return nil, err
40
	}
41
	var dialOpts []grpc.DialOption
42
	switch url.Scheme {
43
	case "grpc":
44
		dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
45
	case "grpcs":
46
		// #nosec G402 -- it's acceptable as this is only to be used in testing
47
		dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
48
			InsecureSkipVerify: true,
49
		})))
50
	default:
51
		return nil, errors.Errorf("unsupported scheme %q. Use one of %s", url.Scheme, []string{"grpc", "grpcs"})
52
	}
53
	conn, err := grpc.Dial(url.Host, dialOpts...)
54
	if err != nil {
55
		return nil, err
56
	}
57
	client := envoy_discovery.NewAggregatedDiscoveryServiceClient(conn)
58
	return &Client{
59
		conn:   conn,
60
		client: client,
61
	}, nil
62
}
63

64
func (c *Client) StartStream() (*Stream, error) {
65
	ctx := metadata.NewOutgoingContext(context.Background(), metadata.MD{})
66
	stream, err := c.client.StreamAggregatedResources(ctx)
67
	if err != nil {
68
		return nil, err
69
	}
70
	return &Stream{
71
		stream:         stream,
72
		latestACKed:    make(map[string]*envoy_discovery.DiscoveryResponse),
73
		latestReceived: make(map[string]*envoy_discovery.DiscoveryResponse),
74
	}, nil
75
}
76

77
func (c *Client) Close() error {
78
	return c.conn.Close()
79
}
80

81
func (s *Stream) Request(clientId string, typ string, dp rest.Resource) error {
82
	dpJSON, err := json.Marshal(dp)
83
	if err != nil {
84
		return err
85
	}
86
	version := &mesh_proto.Version{
87
		KumaDp: &mesh_proto.KumaDpVersion{
88
			Version:   "0.0.1",
89
			GitTag:    "v0.0.1",
90
			GitCommit: "91ce236824a9d875601679aa80c63783fb0e8725",
91
			BuildDate: "2019-08-07T11:26:06Z",
92
		},
93
		Envoy: &mesh_proto.EnvoyVersion{
94
			Version: "1.15.0",
95
			Build:   "hash/1.15.0/RELEASE",
96
		},
97
	}
98
	md := &structpb.Struct{
99
		Fields: map[string]*structpb.Value{
100
			"dataplane.resource": {Kind: &structpb.Value_StringValue{StringValue: string(dpJSON)}},
101
			"version": {
102
				Kind: &structpb.Value_StructValue{
103
					StructValue: util_proto.MustToStruct(version),
104
				},
105
			},
106
		},
107
	}
108
	return s.stream.Send(&envoy_discovery.DiscoveryRequest{
109
		VersionInfo:   "",
110
		ResponseNonce: "",
111
		Node: &envoy_core.Node{
112
			Id:       clientId,
113
			Metadata: md,
114
		},
115
		ResourceNames: []string{},
116
		TypeUrl:       typ,
117
	})
118
}
119

120
func (s *Stream) ACK(typ string) error {
121
	latestReceived := s.latestReceived[typ]
122
	if latestReceived == nil {
123
		return nil
124
	}
125
	err := s.stream.Send(&envoy_discovery.DiscoveryRequest{
126
		VersionInfo:   latestReceived.VersionInfo,
127
		ResponseNonce: latestReceived.Nonce,
128
		ResourceNames: []string{},
129
		TypeUrl:       typ,
130
	})
131
	if err == nil {
132
		s.latestACKed = s.latestReceived
133
	}
134
	return err
135
}
136

137
func (s *Stream) NACK(typ string, err error) error {
138
	latestReceived := s.latestReceived[typ]
139
	if latestReceived == nil {
140
		return nil
141
	}
142
	latestACKed := s.latestACKed[typ]
143
	return s.stream.Send(&envoy_discovery.DiscoveryRequest{
144
		VersionInfo:   latestACKed.GetVersionInfo(),
145
		ResponseNonce: latestReceived.Nonce,
146
		ResourceNames: []string{},
147
		TypeUrl:       typ,
148
		ErrorDetail: &status.Status{
149
			Message: fmt.Sprintf("%s", err),
150
		},
151
	})
152
}
153

154
func (s *Stream) WaitForResources() (*envoy_discovery.DiscoveryResponse, error) {
155
	resp, err := s.stream.Recv()
156
	if err != nil {
157
		return nil, err
158
	}
159
	s.latestReceived[resp.TypeUrl] = resp
160
	return resp, nil
161
}
162

163
func (s *Stream) Close() error {
164
	return s.stream.CloseSend()
165
}
166

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.