crossplane

Форк
0
/
function_runner.go 
307 строк · 10.7 Кб
1
/*
2
Copyright 2023 The Crossplane Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License"); you may not use
5
this file except in compliance with the License. You may obtain a copy of the
6
License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software distributed
11
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
12
CONDITIONS OF ANY KIND, either express or implied. See the License for the
13
specific language governing permissions and limitations under the License.
14
*/
15

16
// Package xfn contains functionality for running Composition Functions.
17
package xfn
18

19
import (
20
	"context"
21
	"crypto/tls"
22
	"sync"
23
	"time"
24

25
	"github.com/pkg/errors"
26
	"google.golang.org/grpc"
27
	"google.golang.org/grpc/credentials"
28
	"google.golang.org/grpc/credentials/insecure"
29
	"sigs.k8s.io/controller-runtime/pkg/client"
30

31
	"github.com/crossplane/crossplane-runtime/pkg/logging"
32

33
	"github.com/crossplane/crossplane/apis/apiextensions/fn/proto/v1beta1"
34
	pkgv1 "github.com/crossplane/crossplane/apis/pkg/v1"
35
	pkgv1beta1 "github.com/crossplane/crossplane/apis/pkg/v1beta1"
36
)
37

38
// Error strings
39
const (
40
	errListFunctionRevisions = "cannot list FunctionRevisions"
41
	errNoActiveRevisions     = "cannot find an active FunctionRevision (a FunctionRevision with spec.desiredState: Active)"
42
	errListFunctions         = "cannot List Functions to determine which gRPC client connections to garbage collect."
43

44
	errFmtGetClientConn = "cannot get gRPC client connection for Function %q"
45
	errFmtRunFunction   = "cannot run Function %q"
46
	errFmtEmptyEndpoint = "cannot determine gRPC target: active FunctionRevision %q has an empty status.endpoint"
47
	errFmtDialFunction  = "cannot gRPC dial target %q from status.endpoint of active FunctionRevision %q"
48
)
49

50
// TODO(negz): Should any of these be configurable?
51
const (
52
	// This configures a gRPC client to use round robin load balancing. This
53
	// means that if the Function Deployment has more than one Pod, and the
54
	// Function Service is headless, requests will be spread across each Pod.
55
	// See https://github.com/grpc/grpc/blob/v1.58.0/doc/load-balancing.md#load-balancing-policies
56
	lbRoundRobin = `{"loadBalancingConfig":[{"round_robin":{}}]}`
57

58
	dialFunctionTimeout = 10 * time.Second
59
	runFunctionTimeout  = 10 * time.Second
60
)
61

62
// A PackagedFunctionRunner runs a Function by making a gRPC call to a Function
63
// package's runtime. It creates a gRPC client connection for each Function. The
64
// Function's endpoint is determined by reading the status.endpoint of the
65
// active FunctionRevision. You must call GarbageCollectClientConnections in
66
// order to ensure connections are properly closed.
67
type PackagedFunctionRunner struct {
68
	client       client.Reader
69
	creds        credentials.TransportCredentials
70
	interceptors []InterceptorCreator
71

72
	connsMx sync.RWMutex
73
	conns   map[string]*grpc.ClientConn
74

75
	log logging.Logger
76
}
77

78
// An InterceptorCreator creates gRPC UnaryClientInterceptors for functions.
79
type InterceptorCreator interface {
80
	// CreateInterceptor creates an interceptor for the named function. It also
81
	// accepts the function's package OCI reference, which may be used by the
82
	// interceptor (e.g. to label metrics).
83
	CreateInterceptor(name, pkg string) grpc.UnaryClientInterceptor
84
}
85

86
// A PackagedFunctionRunnerOption configures a PackagedFunctionRunner.
87
type PackagedFunctionRunnerOption func(r *PackagedFunctionRunner)
88

89
// WithLogger configures the logger the PackagedFunctionRunner should use.
90
func WithLogger(l logging.Logger) PackagedFunctionRunnerOption {
91
	return func(r *PackagedFunctionRunner) {
92
		r.log = l
93
	}
94
}
95

96
// WithTLSConfig configures the client TLS the PackagedFunctionRunner should use.
97
func WithTLSConfig(cfg *tls.Config) PackagedFunctionRunnerOption {
98
	return func(r *PackagedFunctionRunner) {
99
		r.creds = credentials.NewTLS(cfg)
100
	}
101
}
102

103
// WithInterceptorCreators configures the interceptors the
104
// PackagedFunctionRunner should create for each function.
105
func WithInterceptorCreators(ics ...InterceptorCreator) PackagedFunctionRunnerOption {
106
	return func(r *PackagedFunctionRunner) {
107
		r.interceptors = ics
108
	}
109
}
110

111
// NewPackagedFunctionRunner returns a FunctionRunner that runs a Function by
112
// making a gRPC call to a Function package's runtime.
113
func NewPackagedFunctionRunner(c client.Reader, o ...PackagedFunctionRunnerOption) *PackagedFunctionRunner {
114
	r := &PackagedFunctionRunner{
115
		client: c,
116
		creds:  insecure.NewCredentials(),
117
		conns:  make(map[string]*grpc.ClientConn),
118
		log:    logging.NewNopLogger(),
119
	}
120

121
	for _, fn := range o {
122
		fn(r)
123
	}
124

125
	return r
126
}
127

128
// RunFunction sends the supplied RunFunctionRequest to the named Function. The
129
// function is expected to be an installed Function.pkg.crossplane.io package.
130
func (r *PackagedFunctionRunner) RunFunction(ctx context.Context, name string, req *v1beta1.RunFunctionRequest) (*v1beta1.RunFunctionResponse, error) {
131
	conn, err := r.getClientConn(ctx, name)
132
	if err != nil {
133
		return nil, errors.Wrapf(err, errFmtGetClientConn, name)
134
	}
135

136
	// This context is used for actually making the request.
137
	ctx, cancel := context.WithTimeout(ctx, runFunctionTimeout)
138
	defer cancel()
139

140
	rsp, err := v1beta1.NewFunctionRunnerServiceClient(conn).RunFunction(ctx, req)
141
	return rsp, errors.Wrapf(err, errFmtRunFunction, name)
142
}
143

144
// In most cases our gRPC target will be a Kubernetes Service. The package
145
// manager creates this service for each active FunctionRevision, but the
146
// Service is aligned with the Function. It's name is derived from the Function
147
// (not the FunctionRevision). This means the target won't change just because a
148
// new FunctionRevision was created.
149
//
150
// However, once the runtime config design is implemented it's possible that
151
// something other than the package manager will reconcile FunctionRevisions.
152
// There's no guarantee it will create a Service, or that the endpoint will
153
// remain stable across FunctionRevisions.
154
//
155
// https://github.com/crossplane/crossplane/blob/226b81f/design/one-pager-package-runtime-config.md
156
//
157
// With this in mind, we attempt to:
158
//
159
// * Create a connection the first time someone runs a Function.
160
// * Cache it so we don't pay the setup cost every time the Function is called.
161
// * Verify that it has the correct target every time the Function is called.
162
//
163
// In the happy path, where a client already exists, this means we'll pay the
164
// cost of listing and iterating over FunctionRevisions from cache. The default
165
// RevisionHistoryLimit is 1, so for most Functions we'd expect there to be two
166
// revisions in the cache (one active, and one previously active).
167
func (r *PackagedFunctionRunner) getClientConn(ctx context.Context, name string) (*grpc.ClientConn, error) {
168
	log := r.log.WithValues("function", name)
169

170
	l := &pkgv1beta1.FunctionRevisionList{}
171
	if err := r.client.List(ctx, l, client.MatchingLabels{pkgv1.LabelParentPackage: name}); err != nil {
172
		return nil, errors.Wrapf(err, errListFunctionRevisions)
173
	}
174

175
	var active *pkgv1beta1.FunctionRevision
176
	for i := range l.Items {
177
		if l.Items[i].GetDesiredState() == pkgv1.PackageRevisionActive {
178
			active = &l.Items[i]
179
			break
180
		}
181
	}
182
	if active == nil {
183
		return nil, errors.New(errNoActiveRevisions)
184
	}
185

186
	if active.Status.Endpoint == "" {
187
		return nil, errors.Errorf(errFmtEmptyEndpoint, active.GetName())
188
	}
189

190
	r.connsMx.RLock()
191
	conn, ok := r.conns[name]
192
	r.connsMx.RUnlock()
193

194
	if ok {
195
		// We have a connection for the up-to-date endpoint. Return it.
196
		if conn.Target() == active.Status.Endpoint {
197
			return conn, nil
198
		}
199

200
		// This connection is to an old endpoint. We need to close it and create
201
		// a new connection. Close only returns an error is if the connection is
202
		// already closed or in the process of closing.
203
		log.Debug("Closing gRPC client connection with stale target", "old-target", conn.Target(), "new-target", active.Status.Endpoint)
204
		_ = conn.Close()
205
	}
206

207
	// This context is only used for setting up the connection.
208
	ctx, cancel := context.WithTimeout(ctx, dialFunctionTimeout)
209
	defer cancel()
210

211
	is := make([]grpc.UnaryClientInterceptor, len(r.interceptors))
212
	for i := range r.interceptors {
213
		is[i] = r.interceptors[i].CreateInterceptor(name, active.Spec.Package)
214
	}
215

216
	conn, err := grpc.DialContext(ctx, active.Status.Endpoint,
217
		grpc.WithTransportCredentials(r.creds),
218
		grpc.WithDefaultServiceConfig(lbRoundRobin),
219
		grpc.WithChainUnaryInterceptor(is...))
220
	if err != nil {
221
		return nil, errors.Wrapf(err, errFmtDialFunction, active.Status.Endpoint, active.GetName())
222
	}
223

224
	r.connsMx.Lock()
225
	r.conns[name] = conn
226
	r.connsMx.Unlock()
227

228
	log.Debug("Created new gRPC client connection", "target", active.Status.Endpoint)
229
	return conn, nil
230
}
231

232
// GarbageCollectConnections runs every interval until the supplied context is
233
// cancelled. It garbage collects gRPC client connections to Functions that are
234
// no longer installed.
235
func (r *PackagedFunctionRunner) GarbageCollectConnections(ctx context.Context, interval time.Duration) {
236
	t := time.NewTicker(interval)
237
	defer t.Stop()
238

239
	for {
240
		select {
241
		case <-ctx.Done():
242
			r.log.Debug("Stopping gRPC client connection garbage collector", "error", ctx.Err())
243
			return
244
		case <-t.C:
245
			if _, err := r.GarbageCollectConnectionsNow(ctx); err != nil {
246
				r.log.Info("Cannot garbage collect gRPC client connections", "error", err)
247
			}
248
		}
249
	}
250
}
251

252
// GarbageCollectConnectionsNow immediately garbage collects any gRPC client
253
// connections to Functions that are no longer installed. It returns the number
254
// of connections garbage collected.
255
func (r *PackagedFunctionRunner) GarbageCollectConnectionsNow(ctx context.Context) (int, error) {
256
	// We try to take the write lock for as little time as possible,
257
	// because while we have it RunFunction will block. In the happy
258
	// path where no connections need garbage collecting we shouldn't
259
	// take it at all.
260

261
	r.connsMx.RLock()
262
	connections := make([]string, 0, len(r.conns))
263
	for name := range r.conns {
264
		connections = append(connections, name)
265
	}
266
	r.connsMx.RUnlock()
267

268
	// No need to list Functions if there's no work to do.
269
	if len(connections) == 0 {
270
		return 0, nil
271
	}
272

273
	l := &pkgv1beta1.FunctionList{}
274
	if err := r.client.List(ctx, l); err != nil {
275
		return 0, errors.Wrap(err, errListFunctions)
276
	}
277

278
	functionExists := map[string]bool{}
279
	for _, f := range l.Items {
280
		functionExists[f.GetName()] = true
281
	}
282

283
	// Build a list of connections to garbage collect.
284
	gc := make([]string, 0)
285
	for _, name := range connections {
286
		if !functionExists[name] {
287
			gc = append(gc, name)
288
		}
289
	}
290

291
	// No need to take a write lock if there's no work to do.
292
	if len(gc) == 0 {
293
		return 0, nil
294
	}
295

296
	r.log.Debug("Closing gRPC client connections for Functions that are no longer installed", "functions", gc)
297
	r.connsMx.Lock()
298
	for _, name := range gc {
299
		// Close only returns an error is if the connection is already
300
		// closed or in the process of closing.
301
		_ = r.conns[name].Close()
302
		delete(r.conns, name)
303
	}
304
	r.connsMx.Unlock()
305

306
	return len(gc), nil
307
}
308

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.