crossplane
307 строк · 10.7 Кб
1/*
2Copyright 2023 The Crossplane Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License"); you may not use
5this file except in compliance with the License. You may obtain a copy of the
6License at
7
8http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software distributed
11under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
12CONDITIONS OF ANY KIND, either express or implied. See the License for the
13specific language governing permissions and limitations under the License.
14*/
15
16// Package xfn contains functionality for running Composition Functions.
17package xfn
18
19import (
20"context"
21"crypto/tls"
22"sync"
23"time"
24
25"github.com/pkg/errors"
26"google.golang.org/grpc"
27"google.golang.org/grpc/credentials"
28"google.golang.org/grpc/credentials/insecure"
29"sigs.k8s.io/controller-runtime/pkg/client"
30
31"github.com/crossplane/crossplane-runtime/pkg/logging"
32
33"github.com/crossplane/crossplane/apis/apiextensions/fn/proto/v1beta1"
34pkgv1 "github.com/crossplane/crossplane/apis/pkg/v1"
35pkgv1beta1 "github.com/crossplane/crossplane/apis/pkg/v1beta1"
36)
37
38// Error strings
39const (
40errListFunctionRevisions = "cannot list FunctionRevisions"
41errNoActiveRevisions = "cannot find an active FunctionRevision (a FunctionRevision with spec.desiredState: Active)"
42errListFunctions = "cannot List Functions to determine which gRPC client connections to garbage collect."
43
44errFmtGetClientConn = "cannot get gRPC client connection for Function %q"
45errFmtRunFunction = "cannot run Function %q"
46errFmtEmptyEndpoint = "cannot determine gRPC target: active FunctionRevision %q has an empty status.endpoint"
47errFmtDialFunction = "cannot gRPC dial target %q from status.endpoint of active FunctionRevision %q"
48)
49
50// TODO(negz): Should any of these be configurable?
51const (
52// This configures a gRPC client to use round robin load balancing. This
53// means that if the Function Deployment has more than one Pod, and the
54// Function Service is headless, requests will be spread across each Pod.
55// See https://github.com/grpc/grpc/blob/v1.58.0/doc/load-balancing.md#load-balancing-policies
56lbRoundRobin = `{"loadBalancingConfig":[{"round_robin":{}}]}`
57
58dialFunctionTimeout = 10 * time.Second
59runFunctionTimeout = 10 * time.Second
60)
61
62// A PackagedFunctionRunner runs a Function by making a gRPC call to a Function
63// package's runtime. It creates a gRPC client connection for each Function. The
64// Function's endpoint is determined by reading the status.endpoint of the
65// active FunctionRevision. You must call GarbageCollectClientConnections in
66// order to ensure connections are properly closed.
67type PackagedFunctionRunner struct {
68client client.Reader
69creds credentials.TransportCredentials
70interceptors []InterceptorCreator
71
72connsMx sync.RWMutex
73conns map[string]*grpc.ClientConn
74
75log logging.Logger
76}
77
78// An InterceptorCreator creates gRPC UnaryClientInterceptors for functions.
79type InterceptorCreator interface {
80// CreateInterceptor creates an interceptor for the named function. It also
81// accepts the function's package OCI reference, which may be used by the
82// interceptor (e.g. to label metrics).
83CreateInterceptor(name, pkg string) grpc.UnaryClientInterceptor
84}
85
86// A PackagedFunctionRunnerOption configures a PackagedFunctionRunner.
87type PackagedFunctionRunnerOption func(r *PackagedFunctionRunner)
88
89// WithLogger configures the logger the PackagedFunctionRunner should use.
90func WithLogger(l logging.Logger) PackagedFunctionRunnerOption {
91return func(r *PackagedFunctionRunner) {
92r.log = l
93}
94}
95
96// WithTLSConfig configures the client TLS the PackagedFunctionRunner should use.
97func WithTLSConfig(cfg *tls.Config) PackagedFunctionRunnerOption {
98return func(r *PackagedFunctionRunner) {
99r.creds = credentials.NewTLS(cfg)
100}
101}
102
103// WithInterceptorCreators configures the interceptors the
104// PackagedFunctionRunner should create for each function.
105func WithInterceptorCreators(ics ...InterceptorCreator) PackagedFunctionRunnerOption {
106return func(r *PackagedFunctionRunner) {
107r.interceptors = ics
108}
109}
110
111// NewPackagedFunctionRunner returns a FunctionRunner that runs a Function by
112// making a gRPC call to a Function package's runtime.
113func NewPackagedFunctionRunner(c client.Reader, o ...PackagedFunctionRunnerOption) *PackagedFunctionRunner {
114r := &PackagedFunctionRunner{
115client: c,
116creds: insecure.NewCredentials(),
117conns: make(map[string]*grpc.ClientConn),
118log: logging.NewNopLogger(),
119}
120
121for _, fn := range o {
122fn(r)
123}
124
125return r
126}
127
128// RunFunction sends the supplied RunFunctionRequest to the named Function. The
129// function is expected to be an installed Function.pkg.crossplane.io package.
130func (r *PackagedFunctionRunner) RunFunction(ctx context.Context, name string, req *v1beta1.RunFunctionRequest) (*v1beta1.RunFunctionResponse, error) {
131conn, err := r.getClientConn(ctx, name)
132if err != nil {
133return nil, errors.Wrapf(err, errFmtGetClientConn, name)
134}
135
136// This context is used for actually making the request.
137ctx, cancel := context.WithTimeout(ctx, runFunctionTimeout)
138defer cancel()
139
140rsp, err := v1beta1.NewFunctionRunnerServiceClient(conn).RunFunction(ctx, req)
141return rsp, errors.Wrapf(err, errFmtRunFunction, name)
142}
143
144// In most cases our gRPC target will be a Kubernetes Service. The package
145// manager creates this service for each active FunctionRevision, but the
146// Service is aligned with the Function. It's name is derived from the Function
147// (not the FunctionRevision). This means the target won't change just because a
148// new FunctionRevision was created.
149//
150// However, once the runtime config design is implemented it's possible that
151// something other than the package manager will reconcile FunctionRevisions.
152// There's no guarantee it will create a Service, or that the endpoint will
153// remain stable across FunctionRevisions.
154//
155// https://github.com/crossplane/crossplane/blob/226b81f/design/one-pager-package-runtime-config.md
156//
157// With this in mind, we attempt to:
158//
159// * Create a connection the first time someone runs a Function.
160// * Cache it so we don't pay the setup cost every time the Function is called.
161// * Verify that it has the correct target every time the Function is called.
162//
163// In the happy path, where a client already exists, this means we'll pay the
164// cost of listing and iterating over FunctionRevisions from cache. The default
165// RevisionHistoryLimit is 1, so for most Functions we'd expect there to be two
166// revisions in the cache (one active, and one previously active).
167func (r *PackagedFunctionRunner) getClientConn(ctx context.Context, name string) (*grpc.ClientConn, error) {
168log := r.log.WithValues("function", name)
169
170l := &pkgv1beta1.FunctionRevisionList{}
171if err := r.client.List(ctx, l, client.MatchingLabels{pkgv1.LabelParentPackage: name}); err != nil {
172return nil, errors.Wrapf(err, errListFunctionRevisions)
173}
174
175var active *pkgv1beta1.FunctionRevision
176for i := range l.Items {
177if l.Items[i].GetDesiredState() == pkgv1.PackageRevisionActive {
178active = &l.Items[i]
179break
180}
181}
182if active == nil {
183return nil, errors.New(errNoActiveRevisions)
184}
185
186if active.Status.Endpoint == "" {
187return nil, errors.Errorf(errFmtEmptyEndpoint, active.GetName())
188}
189
190r.connsMx.RLock()
191conn, ok := r.conns[name]
192r.connsMx.RUnlock()
193
194if ok {
195// We have a connection for the up-to-date endpoint. Return it.
196if conn.Target() == active.Status.Endpoint {
197return conn, nil
198}
199
200// This connection is to an old endpoint. We need to close it and create
201// a new connection. Close only returns an error is if the connection is
202// already closed or in the process of closing.
203log.Debug("Closing gRPC client connection with stale target", "old-target", conn.Target(), "new-target", active.Status.Endpoint)
204_ = conn.Close()
205}
206
207// This context is only used for setting up the connection.
208ctx, cancel := context.WithTimeout(ctx, dialFunctionTimeout)
209defer cancel()
210
211is := make([]grpc.UnaryClientInterceptor, len(r.interceptors))
212for i := range r.interceptors {
213is[i] = r.interceptors[i].CreateInterceptor(name, active.Spec.Package)
214}
215
216conn, err := grpc.DialContext(ctx, active.Status.Endpoint,
217grpc.WithTransportCredentials(r.creds),
218grpc.WithDefaultServiceConfig(lbRoundRobin),
219grpc.WithChainUnaryInterceptor(is...))
220if err != nil {
221return nil, errors.Wrapf(err, errFmtDialFunction, active.Status.Endpoint, active.GetName())
222}
223
224r.connsMx.Lock()
225r.conns[name] = conn
226r.connsMx.Unlock()
227
228log.Debug("Created new gRPC client connection", "target", active.Status.Endpoint)
229return conn, nil
230}
231
232// GarbageCollectConnections runs every interval until the supplied context is
233// cancelled. It garbage collects gRPC client connections to Functions that are
234// no longer installed.
235func (r *PackagedFunctionRunner) GarbageCollectConnections(ctx context.Context, interval time.Duration) {
236t := time.NewTicker(interval)
237defer t.Stop()
238
239for {
240select {
241case <-ctx.Done():
242r.log.Debug("Stopping gRPC client connection garbage collector", "error", ctx.Err())
243return
244case <-t.C:
245if _, err := r.GarbageCollectConnectionsNow(ctx); err != nil {
246r.log.Info("Cannot garbage collect gRPC client connections", "error", err)
247}
248}
249}
250}
251
252// GarbageCollectConnectionsNow immediately garbage collects any gRPC client
253// connections to Functions that are no longer installed. It returns the number
254// of connections garbage collected.
255func (r *PackagedFunctionRunner) GarbageCollectConnectionsNow(ctx context.Context) (int, error) {
256// We try to take the write lock for as little time as possible,
257// because while we have it RunFunction will block. In the happy
258// path where no connections need garbage collecting we shouldn't
259// take it at all.
260
261r.connsMx.RLock()
262connections := make([]string, 0, len(r.conns))
263for name := range r.conns {
264connections = append(connections, name)
265}
266r.connsMx.RUnlock()
267
268// No need to list Functions if there's no work to do.
269if len(connections) == 0 {
270return 0, nil
271}
272
273l := &pkgv1beta1.FunctionList{}
274if err := r.client.List(ctx, l); err != nil {
275return 0, errors.Wrap(err, errListFunctions)
276}
277
278functionExists := map[string]bool{}
279for _, f := range l.Items {
280functionExists[f.GetName()] = true
281}
282
283// Build a list of connections to garbage collect.
284gc := make([]string, 0)
285for _, name := range connections {
286if !functionExists[name] {
287gc = append(gc, name)
288}
289}
290
291// No need to take a write lock if there's no work to do.
292if len(gc) == 0 {
293return 0, nil
294}
295
296r.log.Debug("Closing gRPC client connections for Functions that are no longer installed", "functions", gc)
297r.connsMx.Lock()
298for _, name := range gc {
299// Close only returns an error is if the connection is already
300// closed or in the process of closing.
301_ = r.conns[name].Close()
302delete(r.conns, name)
303}
304r.connsMx.Unlock()
305
306return len(gc), nil
307}
308