kubelatte-ce

Форк
2
Форк от sbertech/kubelatte-ce
98 строк · 2.6 Кб
1
package grpc
2

3
import (
4
	"context"
5
	"errors"
6
	"gitverse.ru/synapse/kubelatte/pkg/observability/logger"
7
	"gitverse.ru/synapse/kubelatte/pkg/sideeffect"
8
	"gitverse.ru/synapse/kubelatte/pkg/sideeffect/grpc/proto"
9
	"gitverse.ru/synapse/kubelatte/pkg/sideeffect/sideeffect"
10
	"google.golang.org/grpc"
11
	"google.golang.org/grpc/connectivity"
12
	"google.golang.org/grpc/credentials/insecure"
13
)
14

15
var (
16
	serverAddr  = "localhost:8687"
17
	retryPolicy = `{
18
		"methodConfig": [{
19
		  "name": [{"service": "grpc.operator"}],
20
		  "waitForReady": true,
21
		  "retryPolicy": {
22
			  "MaxAttempts": 5,
23
			  "InitialBackoff": ".01s",
24
			  "MaxBackoff": ".01s",
25
			  "BackoffMultiplier": 1.0,
26
			  "RetryableStatusCodes": [ "UNAVAILABLE" ]
27
		  }
28
		}]}`
29
)
30

31
type ProviderClient struct {
32
	client proto.ExecutorClient
33
	conn   *grpc.ClientConn
34
}
35

36
func NewProvider() sideeffect.ProviderI {
37
	return ProviderClient{}
38
}
39

40
func (c ProviderClient) ApplySideEffect(ctx context.Context, data *base.SE) error {
41
	_, err := c.DoSideEffect(ctx, &proto.SideEffectInfo{
42
		Configs: &proto.Configs{
43
			Kind:       data.Kind,
44
			ApiVersion: data.ApiVersion,
45
			ParentTag:  data.ParentTag,
46
			Metadata: &proto.Metadata{
47
				Namespace: data.Metadata.Namespace,
48
				Name:      data.Metadata.Name,
49
			},
50
			TriggerRef: &proto.TriggerRef{
51
				Name:             data.TriggerRef.Name,
52
				Namespace:        data.TriggerRef.Namespace,
53
				SideEffectConfig: data.SideEffectConfig,
54
			},
55
			TemplateRefs: data.TemplateRefs,
56
		},
57
		SeMode: data.SeMode,
58
	})
59
	return err
60
}
61

62
func retryDial() (*grpc.ClientConn, error) {
63
	return grpc.Dial(serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicy))
64
}
65

66
func (c ProviderClient) DoSideEffect(ctx context.Context, info *proto.SideEffectInfo, _ ...grpc.CallOption) (*proto.EmptyMes, error) {
67
	client, done := c.GetClient()
68
	if done {
69
		return &proto.EmptyMes{}, errors.New("grpc connection to injector is done")
70
	}
71

72
	_, err := client.DoSideEffect(ctx, info)
73
	return &proto.EmptyMes{}, err
74
}
75

76
func (c ProviderClient) GetClient() (client proto.ExecutorClient, done bool) {
77
	log := logger.FromContext(context.Background())
78
	if c.client == nil || c.conn == nil || (c.conn.GetState() == connectivity.TransientFailure || c.conn.GetState() == connectivity.Shutdown) {
79
		if c.conn != nil {
80
			err := c.conn.Close()
81
			if err != nil {
82
				log.Debugf("[ProviderClient] Clear conn err %s", err.Error())
83
			}
84
		}
85

86
		var err error
87

88
		c.conn, err = retryDial()
89
		if err != nil {
90
			log.Errorf("[ProviderClient] fail to dial: %s", err.Error())
91
			return nil, true
92
		}
93

94
		c.client = proto.NewExecutorClient(c.conn)
95
	}
96

97
	return c.client, false
98
}
99

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.