kuma
165 строк · 4.3 Кб
1package stream2
3import (4"context"5"crypto/tls"6"encoding/json"7"fmt"8"net/url"9
10envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"11envoy_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"12"github.com/pkg/errors"13"google.golang.org/genproto/googleapis/rpc/status"14"google.golang.org/grpc"15"google.golang.org/grpc/credentials"16"google.golang.org/grpc/credentials/insecure"17"google.golang.org/grpc/metadata"18"google.golang.org/protobuf/types/known/structpb"19
20mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"21"github.com/kumahq/kuma/pkg/core/resources/model/rest"22util_proto "github.com/kumahq/kuma/pkg/util/proto"23)
24
25type Client struct {26conn *grpc.ClientConn27client envoy_discovery.AggregatedDiscoveryServiceClient28}
29
30type Stream struct {31stream envoy_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient32latestACKed map[string]*envoy_discovery.DiscoveryResponse33latestReceived map[string]*envoy_discovery.DiscoveryResponse34}
35
36func New(serverURL string) (*Client, error) {37url, err := url.Parse(serverURL)38if err != nil {39return nil, err40}41var dialOpts []grpc.DialOption42switch url.Scheme {43case "grpc":44dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))45case "grpcs":46// #nosec G402 -- it's acceptable as this is only to be used in testing47dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{48InsecureSkipVerify: true,49})))50default:51return nil, errors.Errorf("unsupported scheme %q. Use one of %s", url.Scheme, []string{"grpc", "grpcs"})52}53conn, err := grpc.Dial(url.Host, dialOpts...)54if err != nil {55return nil, err56}57client := envoy_discovery.NewAggregatedDiscoveryServiceClient(conn)58return &Client{59conn: conn,60client: client,61}, nil62}
63
64func (c *Client) StartStream() (*Stream, error) {65ctx := metadata.NewOutgoingContext(context.Background(), metadata.MD{})66stream, err := c.client.StreamAggregatedResources(ctx)67if err != nil {68return nil, err69}70return &Stream{71stream: stream,72latestACKed: make(map[string]*envoy_discovery.DiscoveryResponse),73latestReceived: make(map[string]*envoy_discovery.DiscoveryResponse),74}, nil75}
76
77func (c *Client) Close() error {78return c.conn.Close()79}
80
81func (s *Stream) Request(clientId string, typ string, dp rest.Resource) error {82dpJSON, err := json.Marshal(dp)83if err != nil {84return err85}86version := &mesh_proto.Version{87KumaDp: &mesh_proto.KumaDpVersion{88Version: "0.0.1",89GitTag: "v0.0.1",90GitCommit: "91ce236824a9d875601679aa80c63783fb0e8725",91BuildDate: "2019-08-07T11:26:06Z",92},93Envoy: &mesh_proto.EnvoyVersion{94Version: "1.15.0",95Build: "hash/1.15.0/RELEASE",96},97}98md := &structpb.Struct{99Fields: map[string]*structpb.Value{100"dataplane.resource": {Kind: &structpb.Value_StringValue{StringValue: string(dpJSON)}},101"version": {102Kind: &structpb.Value_StructValue{103StructValue: util_proto.MustToStruct(version),104},105},106},107}108return s.stream.Send(&envoy_discovery.DiscoveryRequest{109VersionInfo: "",110ResponseNonce: "",111Node: &envoy_core.Node{112Id: clientId,113Metadata: md,114},115ResourceNames: []string{},116TypeUrl: typ,117})118}
119
120func (s *Stream) ACK(typ string) error {121latestReceived := s.latestReceived[typ]122if latestReceived == nil {123return nil124}125err := s.stream.Send(&envoy_discovery.DiscoveryRequest{126VersionInfo: latestReceived.VersionInfo,127ResponseNonce: latestReceived.Nonce,128ResourceNames: []string{},129TypeUrl: typ,130})131if err == nil {132s.latestACKed = s.latestReceived133}134return err135}
136
137func (s *Stream) NACK(typ string, err error) error {138latestReceived := s.latestReceived[typ]139if latestReceived == nil {140return nil141}142latestACKed := s.latestACKed[typ]143return s.stream.Send(&envoy_discovery.DiscoveryRequest{144VersionInfo: latestACKed.GetVersionInfo(),145ResponseNonce: latestReceived.Nonce,146ResourceNames: []string{},147TypeUrl: typ,148ErrorDetail: &status.Status{149Message: fmt.Sprintf("%s", err),150},151})152}
153
154func (s *Stream) WaitForResources() (*envoy_discovery.DiscoveryResponse, error) {155resp, err := s.stream.Recv()156if err != nil {157return nil, err158}159s.latestReceived[resp.TypeUrl] = resp160return resp, nil161}
162
163func (s *Stream) Close() error {164return s.stream.CloseSend()165}
166