kubelatte-ce
Форк от sbertech/kubelatte-ce
98 строк · 2.6 Кб
1package grpc
2
3import (
4"context"
5"errors"
6"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/observability/logger"
7"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/sideeffect"
8"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/sideeffect/grpc/proto"
9"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/sideeffect/sideeffect"
10"google.golang.org/grpc"
11"google.golang.org/grpc/connectivity"
12"google.golang.org/grpc/credentials/insecure"
13)
14
15var (
16serverAddr = "localhost:8687"
17retryPolicy = `{
18"methodConfig": [{
19"name": [{"service": "grpc.operator"}],
20"waitForReady": true,
21"retryPolicy": {
22"MaxAttempts": 5,
23"InitialBackoff": ".01s",
24"MaxBackoff": ".01s",
25"BackoffMultiplier": 1.0,
26"RetryableStatusCodes": [ "UNAVAILABLE" ]
27}
28}]}`
29)
30
31type ProviderClient struct {
32client proto.ExecutorClient
33conn *grpc.ClientConn
34}
35
36func NewProvider() sideeffect.ProviderI {
37return ProviderClient{}
38}
39
40func (c ProviderClient) ApplySideEffect(ctx context.Context, data *base.SE) error {
41_, err := c.DoSideEffect(ctx, &proto.SideEffectInfo{
42Configs: &proto.Configs{
43Kind: data.Kind,
44ApiVersion: data.ApiVersion,
45ParentTag: data.ParentTag,
46Metadata: &proto.Metadata{
47Namespace: data.Metadata.Namespace,
48Name: data.Metadata.Name,
49},
50TriggerRef: &proto.TriggerRef{
51Name: data.TriggerRef.Name,
52Namespace: data.TriggerRef.Namespace,
53SideEffectConfig: data.SideEffectConfig,
54},
55TemplateRefs: data.TemplateRefs,
56},
57SeMode: data.SeMode,
58})
59return err
60}
61
62func retryDial() (*grpc.ClientConn, error) {
63return grpc.Dial(serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicy))
64}
65
66func (c ProviderClient) DoSideEffect(ctx context.Context, info *proto.SideEffectInfo, _ ...grpc.CallOption) (*proto.EmptyMes, error) {
67client, done := c.GetClient()
68if done {
69return &proto.EmptyMes{}, errors.New("grpc connection to injector is done")
70}
71
72_, err := client.DoSideEffect(ctx, info)
73return &proto.EmptyMes{}, err
74}
75
76func (c ProviderClient) GetClient() (client proto.ExecutorClient, done bool) {
77log := logger.FromContext(context.Background())
78if c.client == nil || c.conn == nil || (c.conn.GetState() == connectivity.TransientFailure || c.conn.GetState() == connectivity.Shutdown) {
79if c.conn != nil {
80err := c.conn.Close()
81if err != nil {
82log.Debugf("[ProviderClient] Clear conn err %s", err.Error())
83}
84}
85
86var err error
87
88c.conn, err = retryDial()
89if err != nil {
90log.Errorf("[ProviderClient] fail to dial: %s", err.Error())
91return nil, true
92}
93
94c.client = proto.NewExecutorClient(c.conn)
95}
96
97return c.client, false
98}
99