1
// This Source Code Form is subject to the terms of the Mozilla Public
2
// License, v. 2.0. If a copy of the MPL was not distributed with this
3
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
// Package director provides proxy call routing facility
14
"github.com/siderolabs/grpc-proxy/proxy"
15
"google.golang.org/grpc"
16
"google.golang.org/grpc/codes"
17
"google.golang.org/grpc/metadata"
18
"google.golang.org/grpc/status"
21
// Router wraps grpc-proxy StreamDirector.
23
localBackend proxy.Backend
24
remoteBackendFactory RemoteBackendFactory
25
localAddressProvider LocalAddressProvider
26
streamedMatchers []*regexp.Regexp
29
// RemoteBackendFactory provides backend generation by address (target).
30
type RemoteBackendFactory func(target string) (proxy.Backend, error)
32
// NewRouter builds new Router.
33
func NewRouter(backendFactory RemoteBackendFactory, localBackend proxy.Backend, localAddressProvider LocalAddressProvider) *Router {
35
localBackend: localBackend,
36
remoteBackendFactory: backendFactory,
37
localAddressProvider: localAddressProvider,
41
// Register is no-op to implement factory.Registrator interface.
43
// Actual proxy handler is installed via grpc.UnknownServiceHandler option.
44
func (r *Router) Register(srv *grpc.Server) {
47
// Director implements proxy.StreamDirector function.
50
func (r *Router) Director(ctx context.Context, fullMethodName string) (proxy.Mode, []proxy.Backend, error) {
51
md, ok := metadata.FromIncomingContext(ctx)
53
return proxy.One2One, []proxy.Backend{r.localBackend}, nil
56
if _, exists := md["proxyfrom"]; exists {
57
return proxy.One2One, []proxy.Backend{r.localBackend}, nil
60
nodes, okNodes := md["nodes"]
61
node, okNode := md["node"]
63
if okNode && len(node) != 1 {
64
return proxy.One2One, nil, status.Error(codes.InvalidArgument, "node metadata must be single-valued")
67
// special handling for cases when a single node is requested, but forwarding is disabled
69
// if there's a single destination, and that destination is local node, skip forwarding and send a request to the same node
70
if r.remoteBackendFactory == nil {
71
if okNode && r.localAddressProvider.IsLocalTarget(node[0]) {
75
if okNodes && len(nodes) == 1 && r.localAddressProvider.IsLocalTarget(nodes[0]) {
82
// COSI methods do not support one-2-many proxying.
83
if strings.HasPrefix(fullMethodName, "/cosi.") {
84
return proxy.One2One, nil, status.Error(codes.InvalidArgument, "one-2-many proxying is not supported for COSI methods")
87
return r.aggregateDirector(nodes)
89
return r.singleDirector(node[0])
91
// send directly to local node, skips another layer of proxying
92
return proxy.One2One, []proxy.Backend{r.localBackend}, nil
96
// singleDirector sends request to a single instance in one-2-one mode.
97
func (r *Router) singleDirector(target string) (proxy.Mode, []proxy.Backend, error) {
98
if r.remoteBackendFactory == nil {
99
return proxy.One2One, nil, status.Error(codes.PermissionDenied, "no request forwarding")
102
backend, err := r.remoteBackendFactory(target)
104
return proxy.One2One, nil, status.Error(codes.Internal, err.Error())
107
return proxy.One2One, []proxy.Backend{backend}, nil
110
// aggregateDirector sends request across set of remote instances and aggregates results.
111
func (r *Router) aggregateDirector(targets []string) (proxy.Mode, []proxy.Backend, error) {
112
if r.remoteBackendFactory == nil {
113
return proxy.One2One, nil, status.Error(codes.PermissionDenied, "no request forwarding")
118
backends := make([]proxy.Backend, len(targets))
120
for i, target := range targets {
121
backends[i], err = r.remoteBackendFactory(target)
123
return proxy.One2Many, nil, status.Error(codes.Internal, err.Error())
127
return proxy.One2Many, backends, nil
130
// StreamedDetector implements proxy.StreamedDetector.
131
func (r *Router) StreamedDetector(fullMethodName string) bool {
132
return slices.ContainsFunc(r.streamedMatchers, func(regex *regexp.Regexp) bool { return regex.MatchString(fullMethodName) })
135
// RegisterStreamedRegex register regex for streamed method.
137
// This could be exact literal match: /^\/serviceName\/methodName$/ or any
138
// suffix/prefix match.
139
func (r *Router) RegisterStreamedRegex(regex string) {
140
r.streamedMatchers = append(r.streamedMatchers, regexp.MustCompile(regex))