talm

Форк
0
141 строка · 4.5 Кб
1
// This Source Code Form is subject to the terms of the Mozilla Public
2
// License, v. 2.0. If a copy of the MPL was not distributed with this
3
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4

5
// Package director provides proxy call routing facility
6
package director
7

8
import (
9
	"context"
10
	"regexp"
11
	"slices"
12
	"strings"
13

14
	"github.com/siderolabs/grpc-proxy/proxy"
15
	"google.golang.org/grpc"
16
	"google.golang.org/grpc/codes"
17
	"google.golang.org/grpc/metadata"
18
	"google.golang.org/grpc/status"
19
)
20

21
// Router wraps grpc-proxy StreamDirector.
22
type Router struct {
23
	localBackend         proxy.Backend
24
	remoteBackendFactory RemoteBackendFactory
25
	localAddressProvider LocalAddressProvider
26
	streamedMatchers     []*regexp.Regexp
27
}
28

29
// RemoteBackendFactory provides backend generation by address (target).
30
type RemoteBackendFactory func(target string) (proxy.Backend, error)
31

32
// NewRouter builds new Router.
33
func NewRouter(backendFactory RemoteBackendFactory, localBackend proxy.Backend, localAddressProvider LocalAddressProvider) *Router {
34
	return &Router{
35
		localBackend:         localBackend,
36
		remoteBackendFactory: backendFactory,
37
		localAddressProvider: localAddressProvider,
38
	}
39
}
40

41
// Register is no-op to implement factory.Registrator interface.
42
//
43
// Actual proxy handler is installed via grpc.UnknownServiceHandler option.
44
func (r *Router) Register(srv *grpc.Server) {
45
}
46

47
// Director implements proxy.StreamDirector function.
48
//
49
//nolint:gocyclo
50
func (r *Router) Director(ctx context.Context, fullMethodName string) (proxy.Mode, []proxy.Backend, error) {
51
	md, ok := metadata.FromIncomingContext(ctx)
52
	if !ok {
53
		return proxy.One2One, []proxy.Backend{r.localBackend}, nil
54
	}
55

56
	if _, exists := md["proxyfrom"]; exists {
57
		return proxy.One2One, []proxy.Backend{r.localBackend}, nil
58
	}
59

60
	nodes, okNodes := md["nodes"]
61
	node, okNode := md["node"]
62

63
	if okNode && len(node) != 1 {
64
		return proxy.One2One, nil, status.Error(codes.InvalidArgument, "node metadata must be single-valued")
65
	}
66

67
	// special handling for cases when a single node is requested, but forwarding is disabled
68
	//
69
	// if there's a single destination, and that destination is local node, skip forwarding and send a request to the same node
70
	if r.remoteBackendFactory == nil {
71
		if okNode && r.localAddressProvider.IsLocalTarget(node[0]) {
72
			okNode = false
73
		}
74

75
		if okNodes && len(nodes) == 1 && r.localAddressProvider.IsLocalTarget(nodes[0]) {
76
			okNodes = false
77
		}
78
	}
79

80
	switch {
81
	case okNodes:
82
		// COSI methods do not support one-2-many proxying.
83
		if strings.HasPrefix(fullMethodName, "/cosi.") {
84
			return proxy.One2One, nil, status.Error(codes.InvalidArgument, "one-2-many proxying is not supported for COSI methods")
85
		}
86

87
		return r.aggregateDirector(nodes)
88
	case okNode:
89
		return r.singleDirector(node[0])
90
	default:
91
		// send directly to local node, skips another layer of proxying
92
		return proxy.One2One, []proxy.Backend{r.localBackend}, nil
93
	}
94
}
95

96
// singleDirector sends request to a single instance in one-2-one mode.
97
func (r *Router) singleDirector(target string) (proxy.Mode, []proxy.Backend, error) {
98
	if r.remoteBackendFactory == nil {
99
		return proxy.One2One, nil, status.Error(codes.PermissionDenied, "no request forwarding")
100
	}
101

102
	backend, err := r.remoteBackendFactory(target)
103
	if err != nil {
104
		return proxy.One2One, nil, status.Error(codes.Internal, err.Error())
105
	}
106

107
	return proxy.One2One, []proxy.Backend{backend}, nil
108
}
109

110
// aggregateDirector sends request across set of remote instances and aggregates results.
111
func (r *Router) aggregateDirector(targets []string) (proxy.Mode, []proxy.Backend, error) {
112
	if r.remoteBackendFactory == nil {
113
		return proxy.One2One, nil, status.Error(codes.PermissionDenied, "no request forwarding")
114
	}
115

116
	var err error
117

118
	backends := make([]proxy.Backend, len(targets))
119

120
	for i, target := range targets {
121
		backends[i], err = r.remoteBackendFactory(target)
122
		if err != nil {
123
			return proxy.One2Many, nil, status.Error(codes.Internal, err.Error())
124
		}
125
	}
126

127
	return proxy.One2Many, backends, nil
128
}
129

130
// StreamedDetector implements proxy.StreamedDetector.
131
func (r *Router) StreamedDetector(fullMethodName string) bool {
132
	return slices.ContainsFunc(r.streamedMatchers, func(regex *regexp.Regexp) bool { return regex.MatchString(fullMethodName) })
133
}
134

135
// RegisterStreamedRegex register regex for streamed method.
136
//
137
// This could be exact literal match: /^\/serviceName\/methodName$/ or any
138
// suffix/prefix match.
139
func (r *Router) RegisterStreamedRegex(regex string) {
140
	r.streamedMatchers = append(r.streamedMatchers, regexp.MustCompile(regex))
141
}
142

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.