kubelatte-ce
Форк от sbertech/kubelatte-ce
64 строки · 1.9 Кб
1package grpc
2
3import (
4"context"
5"fmt"
6"gitverse.ru/synapse/kubelatte/pkg/observability/logger"
7"gitverse.ru/synapse/kubelatte/pkg/sideeffect"
8"gitverse.ru/synapse/kubelatte/pkg/sideeffect/grpc/proto"
9"google.golang.org/grpc"
10"net"
11)
12
13type ServerI interface {
14DoSideEffect(ctx context.Context, info *proto.SideEffectInfo) (*proto.EmptyMes, error)
15}
16
17type ExecutorServer struct {
18base.ExecutorI
19proto.UnimplementedExecutorServer
20}
21
22func (s *ExecutorServer) DoSideEffect(ctx context.Context, info *proto.SideEffectInfo) (*proto.EmptyMes, error) {
23s.ExecutorI.ApplySideEffect(ctx, &base.SE{
24Configs: base.Configs{
25Kind: info.GetConfigs().GetKind(),
26ApiVersion: info.GetConfigs().GetApiVersion(),
27ParentTag: info.GetConfigs().GetParentTag(),
28TriggerRef: base.TriggerRef{
29SideEffectConfig: info.GetConfigs().GetTriggerRef().GetSideEffectConfig(),
30Name: info.GetConfigs().GetTriggerRef().GetName(),
31Namespace: info.GetConfigs().GetTriggerRef().GetNamespace(),
32},
33Metadata: base.Metadata{
34Namespace: info.GetConfigs().GetMetadata().GetNamespace(),
35Name: info.GetConfigs().GetMetadata().GetName(),
36},
37TemplateRefs: info.GetConfigs().GetTemplateRefs(),
38},
39SeMode: info.GetSeMode(),
40})
41return &proto.EmptyMes{}, nil
42}
43
44func StartExecuteServer(port int, executor base.ExecutorI) (servShutDown func()) {
45log := logger.FromContext(context.Background())
46log.Debugf("[ExecuteServer] Start grpc server on addr :%d", port)
47lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
48if err != nil {
49log.Errorf("[ExecuteServer] failed to listen: %s", err.Error())
50}
51
52var opts []grpc.ServerOption
53grpcServer := grpc.NewServer(opts...)
54
55go func() {
56proto.RegisterExecutorServer(grpcServer, &ExecutorServer{ExecutorI: executor})
57err = grpcServer.Serve(lis)
58if err != nil {
59return
60}
61}()
62
63return grpcServer.GracefulStop
64}
65