kuma

Форк
0
/
run.go 
375 строк · 17.0 Кб
1
package cmd
2

3
import (
4
	"context"
5
	"os"
6
	"path/filepath"
7
	"time"
8

9
	envoy_bootstrap_v3 "github.com/envoyproxy/go-control-plane/envoy/config/bootstrap/v3"
10
	"github.com/pkg/errors"
11
	"github.com/spf13/cobra"
12

13
	mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
14
	"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/accesslogs"
15
	"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/dnsserver"
16
	"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/envoy"
17
	"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/meshmetrics"
18
	"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/metrics"
19
	kuma_cmd "github.com/kumahq/kuma/pkg/cmd"
20
	"github.com/kumahq/kuma/pkg/config"
21
	kumadp "github.com/kumahq/kuma/pkg/config/app/kuma-dp"
22
	"github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
23
	"github.com/kumahq/kuma/pkg/core/resources/model"
24
	"github.com/kumahq/kuma/pkg/core/resources/model/rest"
25
	"github.com/kumahq/kuma/pkg/core/runtime/component"
26
	core_xds "github.com/kumahq/kuma/pkg/core/xds"
27
	"github.com/kumahq/kuma/pkg/util/net"
28
	"github.com/kumahq/kuma/pkg/util/proto"
29
	kuma_version "github.com/kumahq/kuma/pkg/version"
30
	"github.com/kumahq/kuma/pkg/xds/bootstrap/types"
31
)
32

33
var runLog = dataplaneLog.WithName("run")
34

35
// PersistentPreRunE in root command sets the logger and initial config
36
// PreRunE loads the Kuma DP config
37
// PostRunE actually runs all the components with loaded config
38
// To extend Kuma DP, plug your code in RunE. Use RootContext.Config and add components to RootContext.ComponentManager
39
func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {
40
	cfg := rootCtx.Config
41
	var tmpDir string
42
	var proxyResource model.Resource
43
	cmd := &cobra.Command{
44
		Use:   "run",
45
		Short: "Launch Dataplane (Envoy)",
46
		Long:  `Launch Dataplane (Envoy).`,
47
		RunE: func(cmd *cobra.Command, args []string) error {
48
			return nil
49
		},
50
		PreRunE: func(cmd *cobra.Command, args []string) error {
51
			var err error
52

53
			// only support configuration via environment variables and args
54
			if err := config.Load("", cfg); err != nil {
55
				runLog.Error(err, "unable to load configuration")
56
				return err
57
			}
58

59
			kumadp.PrintDeprecations(cfg, cmd.OutOrStdout())
60

61
			if conf, err := config.ToJson(cfg); err == nil {
62
				runLog.Info("effective configuration", "config", string(conf))
63
			} else {
64
				runLog.Error(err, "unable to format effective configuration", "config", cfg)
65
				return err
66
			}
67

68
			// Map the resource types that are acceptable depending on the value of the `--proxy-type` flag.
69
			proxyTypeMap := map[string]model.ResourceType{
70
				string(mesh_proto.DataplaneProxyType): mesh.DataplaneType,
71
				string(mesh_proto.IngressProxyType):   mesh.ZoneIngressType,
72
				string(mesh_proto.EgressProxyType):    mesh.ZoneEgressType,
73
			}
74

75
			if _, ok := proxyTypeMap[cfg.Dataplane.ProxyType]; !ok {
76
				return errors.Errorf("invalid proxy type %q", cfg.Dataplane.ProxyType)
77
			}
78

79
			if cfg.DataplaneRuntime.EnvoyLogLevel == "" {
80
				cfg.DataplaneRuntime.EnvoyLogLevel = rootCtx.LogLevel.String()
81
			}
82

83
			proxyResource, err = readResource(cmd, &cfg.DataplaneRuntime)
84
			if err != nil {
85
				runLog.Error(err, "failed to read policy", "proxyType", cfg.Dataplane.ProxyType)
86

87
				return err
88
			}
89

90
			if proxyResource != nil {
91
				if resType := proxyTypeMap[cfg.Dataplane.ProxyType]; resType != proxyResource.Descriptor().Name {
92
					return errors.Errorf("invalid proxy resource type %q, expected %s",
93
						proxyResource.Descriptor().Name, resType)
94
				}
95

96
				if cfg.Dataplane.Name != "" || cfg.Dataplane.Mesh != "" {
97
					return errors.New("--name and --mesh cannot be specified when a dataplane definition is provided, mesh and name will be read from the dataplane definition")
98
				}
99

100
				cfg.Dataplane.Mesh = proxyResource.GetMeta().GetMesh()
101
				cfg.Dataplane.Name = proxyResource.GetMeta().GetName()
102
			}
103

104
			if cfg.DataplaneRuntime.ConfigDir == "" || cfg.DNS.ConfigDir == "" {
105
				tmpDir, err = os.MkdirTemp("", "kuma-dp-")
106
				if err != nil {
107
					runLog.Error(err, "unable to create a temporary directory to store generated configuration")
108
					return err
109
				}
110

111
				if cfg.DataplaneRuntime.ConfigDir == "" {
112
					cfg.DataplaneRuntime.ConfigDir = tmpDir
113
				}
114

115
				if cfg.DataplaneRuntime.SocketDir == "" {
116
					cfg.DataplaneRuntime.SocketDir = tmpDir
117
				}
118

119
				if cfg.DNS.ConfigDir == "" {
120
					cfg.DNS.ConfigDir = tmpDir
121
				}
122

123
				runLog.Info("generated configurations will be stored in a temporary directory", "dir", tmpDir)
124
			}
125

126
			if cfg.ControlPlane.CaCert == "" && cfg.ControlPlane.CaCertFile != "" {
127
				cert, err := os.ReadFile(cfg.ControlPlane.CaCertFile)
128
				if err != nil {
129
					return errors.Wrapf(err, "could not read certificate file %s", cfg.ControlPlane.CaCertFile)
130
				}
131
				cfg.ControlPlane.CaCert = string(cert)
132
			}
133
			return nil
134
		},
135
		PostRunE: func(cmd *cobra.Command, _ []string) error {
136
			tokenComp, err := rootCtx.DataplaneTokenGenerator(cfg)
137
			if err != nil {
138
				runLog.Error(err, "unable to get or generate dataplane token")
139
				return err
140
			}
141

142
			if tmpDir != "" { // clean up temp dir if it was created
143
				defer func() {
144
					if err := os.RemoveAll(tmpDir); err != nil {
145
						runLog.Error(err, "unable to remove a temporary directory with a generated Envoy config")
146
					}
147
				}()
148
			}
149

150
			// gracefulCtx indicate that the process received a signal to shutdown
151
			gracefulCtx, ctx := opts.SetupSignalHandler()
152
			// componentCtx indicates that components should shutdown (you can use cancel to trigger the shutdown of all components)
153
			componentCtx, cancelComponents := context.WithCancel(gracefulCtx)
154
			components := []component.Component{tokenComp}
155

156
			opts := envoy.Opts{
157
				Config:    *cfg,
158
				Dataplane: rest.From.Resource(proxyResource),
159
				Stdout:    cmd.OutOrStdout(),
160
				Stderr:    cmd.OutOrStderr(),
161
				OnFinish:  cancelComponents,
162
			}
163

164
			envoyVersion, err := envoy.GetEnvoyVersion(opts.Config.DataplaneRuntime.BinaryPath)
165
			if err != nil {
166
				return errors.Wrap(err, "failed to get Envoy version")
167
			}
168

169
			if envoyVersion.KumaDpCompatible, err = envoy.VersionCompatible("~"+kuma_version.Envoy, envoyVersion.Version); err != nil {
170
				runLog.Error(err, "cannot determine envoy version compatibility")
171
			} else if !envoyVersion.KumaDpCompatible {
172
				runLog.Info("Envoy version incompatible", "expected", kuma_version.Envoy, "current", envoyVersion.Version)
173
			}
174

175
			runLog.Info("fetched Envoy version", "version", envoyVersion)
176

177
			runLog.Info("generating bootstrap configuration")
178
			bootstrap, kumaSidecarConfiguration, err := rootCtx.BootstrapGenerator(gracefulCtx, opts.Config.ControlPlane.URL, opts.Config, envoy.BootstrapParams{
179
				Dataplane:           opts.Dataplane,
180
				DNSPort:             cfg.DNS.EnvoyDNSPort,
181
				EmptyDNSPort:        cfg.DNS.CoreDNSEmptyPort,
182
				EnvoyVersion:        *envoyVersion,
183
				Workdir:             cfg.DataplaneRuntime.SocketDir,
184
				AccessLogSocketPath: core_xds.AccessLogSocketName(cfg.DataplaneRuntime.SocketDir, cfg.Dataplane.Name, cfg.Dataplane.Mesh),
185
				MetricsSocketPath:   core_xds.MetricsHijackerSocketName(cfg.DataplaneRuntime.SocketDir, cfg.Dataplane.Name, cfg.Dataplane.Mesh),
186
				DynamicMetadata:     rootCtx.BootstrapDynamicMetadata,
187
				MetricsCertPath:     cfg.DataplaneRuntime.Metrics.CertPath,
188
				MetricsKeyPath:      cfg.DataplaneRuntime.Metrics.KeyPath,
189
			})
190
			if err != nil {
191
				return errors.Errorf("Failed to generate Envoy bootstrap config. %v", err)
192
			}
193
			runLog.Info("received bootstrap configuration", "adminPort", bootstrap.GetAdmin().GetAddress().GetSocketAddress().GetPortValue())
194

195
			opts.BootstrapConfig, err = proto.ToYAML(bootstrap)
196
			if err != nil {
197
				return errors.Errorf("could not convert to yaml. %v", err)
198
			}
199
			opts.AdminPort = bootstrap.GetAdmin().GetAddress().GetSocketAddress().GetPortValue()
200

201
			if cfg.DNS.Enabled && !cfg.Dataplane.IsZoneProxy() {
202
				dnsOpts := &dnsserver.Opts{
203
					Config:   *cfg,
204
					Stdout:   cmd.OutOrStdout(),
205
					Stderr:   cmd.OutOrStderr(),
206
					OnFinish: cancelComponents,
207
				}
208

209
				if len(kumaSidecarConfiguration.Networking.CorefileTemplate) > 0 {
210
					dnsOpts.ProvidedCorefileTemplate = kumaSidecarConfiguration.Networking.CorefileTemplate
211
				}
212

213
				dnsServer, err := dnsserver.New(dnsOpts)
214
				if err != nil {
215
					return err
216
				}
217

218
				version, err := dnsServer.GetVersion()
219
				if err != nil {
220
					return err
221
				}
222

223
				rootCtx.BootstrapDynamicMetadata[core_xds.FieldPrefixDependenciesVersion+".coredns"] = version
224

225
				components = append(components, dnsServer)
226
			}
227

228
			envoyComponent, err := envoy.New(opts)
229
			if err != nil {
230
				return err
231
			}
232
			components = append(components, envoyComponent)
233

234
			observabilityComponents := setupObservability(kumaSidecarConfiguration, bootstrap, cfg)
235
			components = append(components, observabilityComponents...)
236
			if err := rootCtx.ComponentManager.Add(components...); err != nil {
237
				return err
238
			}
239

240
			stopComponents := make(chan struct{})
241
			go func() {
242
				select {
243
				case <-gracefulCtx.Done():
244
					runLog.Info("Kuma DP caught an exit signal. Draining Envoy connections")
245
					if err := envoyComponent.DrainConnections(); err != nil {
246
						runLog.Error(err, "could not drain connections")
247
					} else {
248
						runLog.Info("waiting for connections to be drained", "waitTime", cfg.Dataplane.DrainTime)
249
						select {
250
						case <-time.After(cfg.Dataplane.DrainTime.Duration):
251
						case <-ctx.Done():
252
						}
253
					}
254
				case <-componentCtx.Done():
255
				}
256
				runLog.Info("stopping all Kuma DP components")
257
				close(stopComponents)
258
			}()
259

260
			runLog.Info("starting Kuma DP", "version", kuma_version.Build.Version)
261
			if err := rootCtx.ComponentManager.Start(stopComponents); err != nil {
262
				runLog.Error(err, "error while running Kuma DP")
263
				return err
264
			}
265
			runLog.Info("stopping Kuma DP")
266
			return nil
267
		},
268
	}
269
	cmd.PersistentFlags().StringVar(&cfg.Dataplane.Name, "name", cfg.Dataplane.Name, "Name of the Dataplane")
270
	cmd.PersistentFlags().StringVar(&cfg.Dataplane.Mesh, "mesh", cfg.Dataplane.Mesh, "Mesh that Dataplane belongs to")
271
	cmd.PersistentFlags().StringVar(&cfg.Dataplane.ProxyType, "proxy-type", "dataplane", `type of the Dataplane ("dataplane", "ingress")`)
272
	cmd.PersistentFlags().DurationVar(&cfg.Dataplane.DrainTime.Duration, "drain-time", cfg.Dataplane.DrainTime.Duration, `drain time for Envoy connections on Kuma DP shutdown`)
273
	cmd.PersistentFlags().StringVar(&cfg.ControlPlane.URL, "cp-address", cfg.ControlPlane.URL, "URL of the Control Plane Dataplane Server. Example: https://localhost:5678")
274
	cmd.PersistentFlags().StringVar(&cfg.ControlPlane.CaCertFile, "ca-cert-file", cfg.ControlPlane.CaCertFile, "Path to CA cert by which connection to the Control Plane will be verified if HTTPS is used")
275
	cmd.PersistentFlags().StringVar(&cfg.DataplaneRuntime.BinaryPath, "binary-path", cfg.DataplaneRuntime.BinaryPath, "Binary path of Envoy executable")
276
	cmd.PersistentFlags().Uint32Var(&cfg.DataplaneRuntime.Concurrency, "concurrency", cfg.DataplaneRuntime.Concurrency, "Number of Envoy worker threads")
277
	cmd.PersistentFlags().StringVar(&cfg.DataplaneRuntime.ConfigDir, "config-dir", cfg.DataplaneRuntime.ConfigDir, "Directory in which Envoy config will be generated")
278
	cmd.PersistentFlags().StringVar(&cfg.DataplaneRuntime.TokenPath, "dataplane-token-file", cfg.DataplaneRuntime.TokenPath, "Path to a file with dataplane token (use 'kumactl generate dataplane-token' to get one)")
279
	cmd.PersistentFlags().StringVar(&cfg.DataplaneRuntime.Token, "dataplane-token", cfg.DataplaneRuntime.Token, "Dataplane Token")
280
	cmd.PersistentFlags().StringVar(&cfg.DataplaneRuntime.Resource, "dataplane", "", "Dataplane template to apply (YAML or JSON)")
281
	cmd.PersistentFlags().StringVarP(&cfg.DataplaneRuntime.ResourcePath, "dataplane-file", "d", "", "Path to Dataplane template to apply (YAML or JSON)")
282
	cmd.PersistentFlags().StringToStringVarP(&cfg.DataplaneRuntime.ResourceVars, "dataplane-var", "v", map[string]string{}, "Variables to replace Dataplane template")
283
	cmd.PersistentFlags().StringVar(&cfg.DataplaneRuntime.EnvoyLogLevel, "envoy-log-level", "", "Envoy log level. Available values are: [trace][debug][info][warning|warn][error][critical][off]. By default it inherits Kuma DP logging level.")
284
	cmd.PersistentFlags().StringVar(&cfg.DataplaneRuntime.EnvoyComponentLogLevel, "envoy-component-log-level", "", "Configures Envoy's --component-log-level")
285
	cmd.PersistentFlags().StringVar(&cfg.DataplaneRuntime.Metrics.CertPath, "metrics-cert-path", cfg.DataplaneRuntime.Metrics.CertPath, "A path to the certificate for metrics listener")
286
	cmd.PersistentFlags().StringVar(&cfg.DataplaneRuntime.Metrics.KeyPath, "metrics-key-path", cfg.DataplaneRuntime.Metrics.KeyPath, "A path to the certificate key for metrics listener")
287
	cmd.PersistentFlags().BoolVar(&cfg.DNS.Enabled, "dns-enabled", cfg.DNS.Enabled, "If true then builtin DNS functionality is enabled and CoreDNS server is started")
288
	cmd.PersistentFlags().Uint32Var(&cfg.DNS.EnvoyDNSPort, "dns-envoy-port", cfg.DNS.EnvoyDNSPort, "A port that handles Virtual IP resolving by Envoy. CoreDNS should be configured that it first tries to use this DNS resolver and then the real one")
289
	cmd.PersistentFlags().Uint32Var(&cfg.DNS.CoreDNSPort, "dns-coredns-port", cfg.DNS.CoreDNSPort, "A port that handles DNS requests. When transparent proxy is enabled then iptables will redirect DNS traffic to this port.")
290
	cmd.PersistentFlags().Uint32Var(&cfg.DNS.CoreDNSEmptyPort, "dns-coredns-empty-port", cfg.DNS.CoreDNSEmptyPort, "A port that always responds with empty NXDOMAIN respond. It is required to implement a fallback to a real DNS.")
291
	cmd.PersistentFlags().StringVar(&cfg.DNS.CoreDNSBinaryPath, "dns-coredns-path", cfg.DNS.CoreDNSBinaryPath, "A path to CoreDNS binary.")
292
	cmd.PersistentFlags().StringVar(&cfg.DNS.CoreDNSConfigTemplatePath, "dns-coredns-config-template-path", cfg.DNS.CoreDNSConfigTemplatePath, "A path to a CoreDNS config template.")
293
	cmd.PersistentFlags().StringVar(&cfg.DNS.ConfigDir, "dns-server-config-dir", cfg.DNS.ConfigDir, "Directory in which DNS Server config will be generated")
294
	cmd.PersistentFlags().Uint32Var(&cfg.DNS.PrometheusPort, "dns-prometheus-port", cfg.DNS.PrometheusPort, "A port for exposing Prometheus stats")
295
	cmd.PersistentFlags().BoolVar(&cfg.DNS.CoreDNSLogging, "dns-enable-logging", cfg.DNS.CoreDNSLogging, "If true then CoreDNS logging is enabled")
296

297
	_ = cmd.PersistentFlags().MarkDeprecated("dns-coredns-empty-port", "It's not used anymore. It will be removed in 2.7.x version")
298
	return cmd
299
}
300

301
func getApplicationsToScrape(kumaSidecarConfiguration *types.KumaSidecarConfiguration, envoyAdminPort uint32) []metrics.ApplicationToScrape {
302
	var applicationsToScrape []metrics.ApplicationToScrape
303
	if kumaSidecarConfiguration != nil {
304
		for _, item := range kumaSidecarConfiguration.Metrics.Aggregate {
305
			applicationsToScrape = append(applicationsToScrape, metrics.ApplicationToScrape{
306
				Address:       item.Address,
307
				Name:          item.Name,
308
				Path:          item.Path,
309
				Port:          item.Port,
310
				IsIPv6:        net.IsAddressIPv6(item.Address),
311
				QueryModifier: metrics.RemoveQueryParameters,
312
				OtelMutator:   metrics.ParsePrometheusMetrics,
313
			})
314
		}
315
	}
316
	// by default add envoy configuration
317
	applicationsToScrape = append(applicationsToScrape, metrics.ApplicationToScrape{
318
		Name:          "envoy",
319
		Path:          "/stats",
320
		Address:       "127.0.0.1",
321
		Port:          envoyAdminPort,
322
		IsIPv6:        false,
323
		QueryModifier: metrics.AddPrometheusFormat,
324
		Mutator:       metrics.MergeClusters,
325
		OtelMutator:   metrics.MergeClustersForOpenTelemetry,
326
	})
327
	return applicationsToScrape
328
}
329

330
func writeFile(filename string, data []byte, perm os.FileMode) error {
331
	if err := os.MkdirAll(filepath.Dir(filename), perm); err != nil {
332
		return err
333
	}
334
	return os.WriteFile(filename, data, perm)
335
}
336

337
func setupObservability(kumaSidecarConfiguration *types.KumaSidecarConfiguration, bootstrap *envoy_bootstrap_v3.Bootstrap, cfg *kumadp.Config) []component.Component {
338
	baseApplicationsToScrape := getApplicationsToScrape(kumaSidecarConfiguration, bootstrap.GetAdmin().GetAddress().GetSocketAddress().GetPortValue())
339

340
	accessLogStreamer := component.NewResilientComponent(
341
		runLog.WithName("access-log-streamer"),
342
		accesslogs.NewAccessLogStreamer(
343
			core_xds.AccessLogSocketName(cfg.DataplaneRuntime.SocketDir, cfg.Dataplane.Name, cfg.Dataplane.Mesh),
344
		),
345
	)
346

347
	metricsServer := metrics.New(
348
		core_xds.MetricsHijackerSocketName(cfg.DataplaneRuntime.SocketDir, cfg.Dataplane.Name, cfg.Dataplane.Mesh),
349
		baseApplicationsToScrape,
350
		kumaSidecarConfiguration.Networking.IsUsingTransparentProxy,
351
	)
352

353
	openTelemetryProducer := metrics.NewAggregatedMetricsProducer(
354
		cfg.Dataplane.Mesh,
355
		cfg.Dataplane.Name,
356
		bootstrap.Node.Cluster,
357
		baseApplicationsToScrape,
358
		kumaSidecarConfiguration.Networking.IsUsingTransparentProxy,
359
	)
360

361
	meshMetricsConfigFetcher := component.NewResilientComponent(
362
		runLog.WithName("mesh-metric-config-fetcher"),
363
		meshmetrics.NewMeshMetricConfigFetcher(
364
			core_xds.MeshMetricsDynamicConfigurationSocketName(cfg.DataplaneRuntime.SocketDir),
365
			time.NewTicker(cfg.DataplaneRuntime.DynamicConfiguration.RefreshInterval.Duration),
366
			metricsServer,
367
			openTelemetryProducer,
368
			kumaSidecarConfiguration.Networking.Address,
369
			bootstrap.GetAdmin().GetAddress().GetSocketAddress().GetPortValue(),
370
			bootstrap.GetAdmin().GetAddress().GetSocketAddress().GetAddress(),
371
		),
372
	)
373

374
	return []component.Component{accessLogStreamer, meshMetricsConfigFetcher, metricsServer}
375
}
376

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.