istio

Форк
0
/
convert.go 
366 строк · 13.6 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package wasm
16

17
import (
18
	"fmt"
19
	"strings"
20
	"sync"
21
	"time"
22

23
	udpa "github.com/cncf/xds/go/udpa/type/v1"
24
	core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
25
	httprbac "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/rbac/v3"
26
	httpwasm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/wasm/v3"
27
	networkrbac "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/rbac/v3"
28
	networkwasm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/wasm/v3"
29
	wasmextensions "github.com/envoyproxy/go-control-plane/envoy/extensions/wasm/v3"
30
	"github.com/envoyproxy/go-control-plane/pkg/conversion"
31
	"github.com/hashicorp/go-multierror"
32
	anypb "google.golang.org/protobuf/types/known/anypb"
33

34
	extensions "istio.io/api/extensions/v1alpha1"
35
	"istio.io/istio/pilot/pkg/model"
36
	"istio.io/istio/pilot/pkg/util/protoconv"
37
	"istio.io/istio/pkg/bootstrap"
38
	"istio.io/istio/pkg/config/xds"
39
	"istio.io/istio/pkg/util/istiomultierror"
40
)
41

42
var (
43
	allowHTTPTypedConfig    = protoconv.MessageToAny(&httprbac.RBAC{})
44
	allowNetworkTypedConfig = protoconv.MessageToAny(&networkrbac.RBAC{})
45
)
46

47
func createHTTPAllowAllFilter(name string) (*anypb.Any, error) {
48
	ec := &core.TypedExtensionConfig{
49
		Name:        name,
50
		TypedConfig: allowHTTPTypedConfig,
51
	}
52
	return anypb.New(ec)
53
}
54

55
func createNetworkAllowAllFilter(name string) (*anypb.Any, error) {
56
	ec := &core.TypedExtensionConfig{
57
		Name:        name,
58
		TypedConfig: allowNetworkTypedConfig,
59
	}
60
	return anypb.New(ec)
61
}
62

63
// MaybeConvertWasmExtensionConfig converts any presence of module remote download to local file.
64
// It downloads the Wasm module and stores the module locally in the file system.
65
func MaybeConvertWasmExtensionConfig(resources []*anypb.Any, cache Cache) error {
66
	var wg sync.WaitGroup
67

68
	numResources := len(resources)
69
	convertErrs := make([]error, numResources)
70
	wg.Add(numResources)
71

72
	startTime := time.Now()
73
	defer func() {
74
		wasmConfigConversionDuration.Record(float64(time.Since(startTime).Milliseconds()))
75
	}()
76

77
	for i := 0; i < numResources; i++ {
78
		go func(i int) {
79
			defer wg.Done()
80
			extConfig, wasmHTTPConfig, wasmNetworkConfig, err := tryUnmarshal(resources[i])
81
			if err != nil {
82
				wasmConfigConversionCount.
83
					With(resultTag.Value(unmarshalFailure)).
84
					Increment()
85
				convertErrs[i] = err
86
				return
87
			}
88

89
			if extConfig == nil || (wasmHTTPConfig == nil && wasmNetworkConfig == nil) {
90
				// If there is no config, it is not wasm config.
91
				// Let's bypass the ECDS resource.
92
				wasmConfigConversionCount.
93
					With(resultTag.Value(noRemoteLoad)).
94
					Increment()
95
				return
96
			}
97
			if wasmHTTPConfig != nil {
98
				newExtensionConfig, err := convertHTTPWasmConfigFromRemoteToLocal(extConfig, wasmHTTPConfig, cache)
99
				if err != nil {
100
					if !wasmHTTPConfig.GetConfig().GetFailOpen() {
101
						convertErrs[i] = err
102
						return
103
					}
104
					// Use NOOP filter because the download failed.
105
					newExtensionConfig, err = createHTTPAllowAllFilter(extConfig.GetName())
106
					if err != nil {
107
						// If the fallback is failing, send the Nack regardless of fail_open.
108
						err = fmt.Errorf("failed to create allow-all filter as a fallback of %s Wasm Module: %w", extConfig.GetName(), err)
109
						convertErrs[i] = err
110
						return
111
					}
112
				}
113
				resources[i] = newExtensionConfig
114
			} else {
115
				newExtensionConfig, err := convertNetworkWasmConfigFromRemoteToLocal(extConfig, wasmNetworkConfig, cache)
116
				if err != nil {
117
					if !wasmNetworkConfig.GetConfig().GetFailOpen() {
118
						convertErrs[i] = err
119
						return
120
					}
121
					// Use NOOP filter because the download failed.
122
					newExtensionConfig, err = createNetworkAllowAllFilter(extConfig.GetName())
123
					if err != nil {
124
						// If the fallback is failing, send the Nack regardless of fail_open.
125
						err = fmt.Errorf("failed to create allow-all filter as a fallback of %s Wasm Module: %w", extConfig.GetName(), err)
126
						convertErrs[i] = err
127
						return
128
					}
129
				}
130
				resources[i] = newExtensionConfig
131
			}
132
		}(i)
133
	}
134

135
	wg.Wait()
136
	err := multierror.Append(istiomultierror.New(), convertErrs...).ErrorOrNil()
137
	if err != nil {
138
		wasmLog.Errorf("convert the wasm config: %v", err)
139
	}
140
	return err
141
}
142

143
// tryUnmarshal returns the typed extension config and wasm config by unmarsharling `resource`,
144
// if `resource` is a wasm config loading a wasm module from the remote site.
145
// It returns `nil` for both the typed extension config and wasm config if it is not for the remote wasm or has an error.
146
func tryUnmarshal(resource *anypb.Any) (*core.TypedExtensionConfig, *httpwasm.Wasm, *networkwasm.Wasm, error) {
147
	ec := &core.TypedExtensionConfig{}
148
	wasmHTTPFilterConfig := &httpwasm.Wasm{}
149
	wasmNetworkFilterConfig := &networkwasm.Wasm{}
150
	wasmNetwork := false
151
	if err := resource.UnmarshalTo(ec); err != nil {
152
		return nil, nil, nil, fmt.Errorf("failed to unmarshal extension config resource: %w", err)
153
	}
154

155
	// Wasm filter can be configured using typed struct and Wasm filter type
156
	switch {
157
	case ec.GetTypedConfig() == nil:
158
		return nil, nil, nil, fmt.Errorf("typed extension config %+v does not contain any typed config", ec)
159
	case ec.GetTypedConfig().TypeUrl == xds.WasmHTTPFilterType:
160
		if err := ec.GetTypedConfig().UnmarshalTo(wasmHTTPFilterConfig); err != nil {
161
			return nil, nil, nil, fmt.Errorf("failed to unmarshal extension config resource into Wasm HTTP filter: %w", err)
162
		}
163
	case ec.GetTypedConfig().TypeUrl == xds.WasmNetworkFilterType:
164
		wasmNetwork = true
165
		if err := ec.GetTypedConfig().UnmarshalTo(wasmNetworkFilterConfig); err != nil {
166
			return nil, nil, nil, fmt.Errorf("failed to unmarshal extension config resource into Wasm Network filter: %w", err)
167
		}
168
	case ec.GetTypedConfig().TypeUrl == xds.TypedStructType:
169
		typedStruct := &udpa.TypedStruct{}
170
		wasmTypedConfig := ec.GetTypedConfig()
171
		if err := wasmTypedConfig.UnmarshalTo(typedStruct); err != nil {
172
			return nil, nil, nil, fmt.Errorf("failed to unmarshal typed config for wasm filter: %w", err)
173
		}
174

175
		if typedStruct.TypeUrl == xds.WasmHTTPFilterType {
176
			if err := conversion.StructToMessage(typedStruct.Value, wasmHTTPFilterConfig); err != nil {
177
				return nil, nil, nil, fmt.Errorf("failed to convert extension config struct %+v to Wasm Network filter", typedStruct)
178
			}
179
		} else if typedStruct.TypeUrl == xds.WasmNetworkFilterType {
180
			wasmNetwork = true
181
			if err := conversion.StructToMessage(typedStruct.Value, wasmNetworkFilterConfig); err != nil {
182
				return nil, nil, nil, fmt.Errorf("failed to convert extension config struct %+v to Wasm HTTP filter", typedStruct)
183
			}
184
		} else {
185
			// This is not a Wasm filter.
186
			wasmLog.Debugf("typed extension config %+v does not contain wasm http filter", typedStruct)
187
			return nil, nil, nil, nil
188
		}
189
	default:
190
		// This is not a Wasm filter.
191
		wasmLog.Debugf("cannot find typed config or typed struct in %+v", ec)
192
		return nil, nil, nil, nil
193
	}
194

195
	// At this point, we should have wasmNetworkFilterConfig or wasmHTTPFilterConfig should be unmarshalled.
196
	if wasmNetwork {
197
		if wasmNetworkFilterConfig.Config.GetVmConfig().GetCode().GetRemote() == nil {
198
			if wasmNetworkFilterConfig.Config.GetVmConfig().GetCode().GetLocal() == nil {
199
				return nil, nil, nil, fmt.Errorf("no remote and local load found in Wasm Network filter %+v", wasmNetworkFilterConfig)
200
			}
201
			// This has a local Wasm. Let's bypass it.
202
			wasmLog.Debugf("no remote load found in Wasm Network filter %+v", wasmNetworkFilterConfig)
203
			return nil, nil, nil, nil
204
		}
205
		return ec, nil, wasmNetworkFilterConfig, nil
206
	}
207
	if wasmHTTPFilterConfig.Config.GetVmConfig().GetCode().GetRemote() == nil {
208
		if wasmHTTPFilterConfig.Config.GetVmConfig().GetCode().GetLocal() == nil {
209
			return nil, nil, nil, fmt.Errorf("no remote and local load found in Wasm HTTP filter %+v", wasmHTTPFilterConfig)
210
		}
211
		// This has a local Wasm. Let's bypass it.
212
		wasmLog.Debugf("no remote load found in Wasm HTTP filter %+v", wasmHTTPFilterConfig)
213
		return nil, nil, nil, nil
214
	}
215

216
	return ec, wasmHTTPFilterConfig, nil, nil
217
}
218

219
func convertHTTPWasmConfigFromRemoteToLocal(ec *core.TypedExtensionConfig, wasmHTTPFilterConfig *httpwasm.Wasm, cache Cache) (*anypb.Any, error) {
220
	status := conversionSuccess
221
	defer func() {
222
		wasmConfigConversionCount.
223
			With(resultTag.Value(status)).
224
			Increment()
225
	}()
226

227
	// ec.Name is resourceName.
228
	// https://github.com/istio/istio/blob/9ea7ad532a9cc58a3564143d41ac89a61aaa8058/pilot/pkg/networking/core/v1alpha3/extension/wasmplugin.go#L103
229
	err := rewriteVMConfig(ec.Name, wasmHTTPFilterConfig.Config.GetVmConfig(), &status, cache, wasmHTTPFilterConfig.Config.Name)
230
	if err != nil {
231
		return nil, err
232
	}
233

234
	wasmTypedConfig, err := anypb.New(wasmHTTPFilterConfig)
235
	if err != nil {
236
		status = marshalFailure
237
		return nil, fmt.Errorf("failed to marshal new wasm HTTP filter %+v to protobuf Any: %w", wasmHTTPFilterConfig, err)
238
	}
239
	ec.TypedConfig = wasmTypedConfig
240
	wasmLog.Debugf("new extension config resource %+v", ec)
241

242
	nec, err := anypb.New(ec)
243
	if err != nil {
244
		status = marshalFailure
245
		return nil, fmt.Errorf("failed to marshal new extension config resource: %w", err)
246
	}
247

248
	// At this point, we are certain that wasm module has been downloaded and config is rewritten.
249
	// ECDS will be rewritten successfully.
250
	return nec, nil
251
}
252

253
func convertNetworkWasmConfigFromRemoteToLocal(ec *core.TypedExtensionConfig, wasmNetworkFilterConfig *networkwasm.Wasm, cache Cache) (*anypb.Any, error) {
254
	status := conversionSuccess
255
	defer func() {
256
		wasmConfigConversionCount.
257
			With(resultTag.Value(status)).
258
			Increment()
259
	}()
260

261
	// ec.Name is resourceName.
262
	// https://github.com/istio/istio/blob/9ea7ad532a9cc58a3564143d41ac89a61aaa8058/pilot/pkg/networking/core/v1alpha3/extension/wasmplugin.go#L103
263
	err := rewriteVMConfig(ec.Name, wasmNetworkFilterConfig.Config.GetVmConfig(), &status, cache, wasmNetworkFilterConfig.Config.Name)
264
	if err != nil {
265
		return nil, err
266
	}
267
	wasmTypedConfig, err := anypb.New(wasmNetworkFilterConfig)
268
	if err != nil {
269
		status = marshalFailure
270
		return nil, fmt.Errorf("failed to marshal new wasm Network filter %+v to protobuf Any: %w", wasmNetworkFilterConfig, err)
271
	}
272
	ec.TypedConfig = wasmTypedConfig
273
	wasmLog.Debugf("new extension config resource %+v", ec)
274

275
	nec, err := anypb.New(ec)
276
	if err != nil {
277
		status = marshalFailure
278
		return nil, fmt.Errorf("failed to marshal new extension config resource: %w", err)
279
	}
280

281
	// At this point, we are certain that wasm module has been downloaded and config is rewritten.
282
	// ECDS will be rewritten successfully.
283
	return nec, nil
284
}
285

286
func rewriteVMConfig(resourceName string, vm *wasmextensions.VmConfig, status *string, cache Cache, configName string) error {
287
	envs := vm.GetEnvironmentVariables()
288
	var pullSecret []byte
289
	pullPolicy := extensions.PullPolicy_UNSPECIFIED_POLICY
290
	resourceVersion := ""
291
	if envs != nil {
292
		if sec, found := envs.KeyValues[model.WasmSecretEnv]; found {
293
			if sec == "" {
294
				*status = fetchFailure
295
				return fmt.Errorf("cannot fetch Wasm module %v: missing image pulling secret", configName)
296
			}
297
			pullSecret = []byte(sec)
298
		}
299

300
		if ps, found := envs.KeyValues[model.WasmPolicyEnv]; found {
301
			if p, found := extensions.PullPolicy_value[ps]; found {
302
				pullPolicy = extensions.PullPolicy(p)
303
			}
304
		}
305
		resourceVersion = envs.KeyValues[model.WasmResourceVersionEnv]
306

307
		// Strip all internal env variables(with ISTIO_META) from VM env variable.
308
		// These env variables are added by Istio control plane and meant to be consumed by the
309
		// agent for image pulling control should not be leaked to Envoy or the Wasm extension runtime.
310
		for k := range envs.KeyValues {
311
			if strings.HasPrefix(k, bootstrap.IstioMetaPrefix) {
312
				delete(envs.KeyValues, k)
313
			}
314
		}
315
		if len(envs.KeyValues) == 0 {
316
			if len(envs.HostEnvKeys) == 0 {
317
				vm.EnvironmentVariables = nil
318
			} else {
319
				envs.KeyValues = nil
320
			}
321
		}
322
	}
323
	remote := vm.GetCode().GetRemote()
324
	httpURI := remote.GetHttpUri()
325
	if httpURI == nil {
326
		*status = missRemoteFetchHint
327
		return fmt.Errorf("wasm remote fetch %+v does not have httpUri specified for config %s", remote, configName)
328
	}
329
	// checksum sent by istiod can be "nil" if not set by user - magic value used to avoid unmarshaling errors
330
	if remote.Sha256 == "nil" {
331
		remote.Sha256 = ""
332
	}
333

334
	// Default timeout, without this, if a user does not specify a timeout in the config, it fails with deadline exceeded
335
	// while building transport in go container.
336
	timeout := time.Second * 5
337
	if remote.GetHttpUri().Timeout != nil {
338
		// This is always 30s, because the timeout is set by the control plane when converted to WasmPluginWrapper.
339
		// see buildDataSource() in pilot/pkg/model/extensions.go
340
		timeout = remote.GetHttpUri().Timeout.AsDuration()
341
	}
342
	f, err := cache.Get(httpURI.GetUri(), GetOptions{
343
		Checksum:        remote.Sha256,
344
		ResourceName:    resourceName,
345
		ResourceVersion: resourceVersion,
346
		RequestTimeout:  timeout,
347
		PullSecret:      pullSecret,
348
		PullPolicy:      pullPolicy,
349
	})
350
	if err != nil {
351
		*status = fetchFailure
352
		return fmt.Errorf("cannot fetch Wasm module %v: %w", remote.GetHttpUri().GetUri(), err)
353
	}
354

355
	// Rewrite remote fetch to local file.
356
	vm.Code = &core.AsyncDataSource{
357
		Specifier: &core.AsyncDataSource_Local{
358
			Local: &core.DataSource{
359
				Specifier: &core.DataSource_Filename{
360
					Filename: f,
361
				},
362
			},
363
		},
364
	}
365
	return nil
366
}
367

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.