istio

Форк
0
316 строк · 12.6 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
	"context"
19
	"strings"
20

21
	"k8s.io/apimachinery/pkg/api/errors"
22
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23
	"k8s.io/client-go/kubernetes"
24

25
	"istio.io/api/annotation"
26
	"istio.io/istio/pilot/pkg/config/kube/crdclient"
27
	"istio.io/istio/pilot/pkg/features"
28
	"istio.io/istio/pilot/pkg/keycertbundle"
29
	"istio.io/istio/pilot/pkg/leaderelection"
30
	"istio.io/istio/pilot/pkg/model"
31
	"istio.io/istio/pilot/pkg/server"
32
	"istio.io/istio/pilot/pkg/serviceregistry/aggregate"
33
	"istio.io/istio/pilot/pkg/serviceregistry/provider"
34
	"istio.io/istio/pilot/pkg/serviceregistry/serviceentry"
35
	"istio.io/istio/pkg/backoff"
36
	"istio.io/istio/pkg/config/schema/collection"
37
	"istio.io/istio/pkg/config/schema/collections"
38
	kubelib "istio.io/istio/pkg/kube"
39
	"istio.io/istio/pkg/kube/multicluster"
40
	"istio.io/istio/pkg/webhooks"
41
)
42

43
const (
44
	// Name of the webhook config in the config - no need to change it.
45
	webhookName = "sidecar-injector.istio.io"
46
)
47

48
type kubeController struct {
49
	MeshServiceController *aggregate.Controller
50
	*Controller
51
	workloadEntryController *serviceentry.Controller
52
	stop                    chan struct{}
53
}
54

55
func (k *kubeController) Close() {
56
	close(k.stop)
57
	clusterID := k.Controller.clusterID
58
	k.MeshServiceController.UnRegisterHandlersForCluster(clusterID)
59
	k.MeshServiceController.DeleteRegistry(clusterID, provider.Kubernetes)
60
	if k.workloadEntryController != nil {
61
		k.MeshServiceController.DeleteRegistry(clusterID, provider.External)
62
	}
63
	if err := k.Controller.Cleanup(); err != nil {
64
		log.Warnf("failed cleaning up services in %s: %v", clusterID, err)
65
	}
66
	if k.opts.XDSUpdater != nil {
67
		k.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{Full: true, Reason: model.NewReasonStats(model.ClusterUpdate)})
68
	}
69
}
70

71
// Multicluster structure holds the remote kube Controllers and multicluster specific attributes.
72
type Multicluster struct {
73
	// serverID of this pilot instance used for leader election
74
	serverID string
75

76
	// options to use when creating kube controllers
77
	opts Options
78

79
	// client for reading remote-secrets to initialize multicluster registries
80
	client kubernetes.Interface
81
	s      server.Instance
82

83
	serviceEntryController *serviceentry.Controller
84
	configController       model.ConfigStoreController
85
	XDSUpdater             model.XDSUpdater
86

87
	clusterLocal model.ClusterLocalProvider
88

89
	startNsController bool
90
	caBundleWatcher   *keycertbundle.Watcher
91
	revision          string
92

93
	// secretNamespace where we get cluster-access secrets
94
	secretNamespace string
95
	component       *multicluster.Component[*kubeController]
96
}
97

98
// NewMulticluster initializes data structure to store multicluster information
99
func NewMulticluster(
100
	serverID string,
101
	kc kubernetes.Interface,
102
	secretNamespace string,
103
	opts Options,
104
	serviceEntryController *serviceentry.Controller,
105
	configController model.ConfigStoreController,
106
	caBundleWatcher *keycertbundle.Watcher,
107
	revision string,
108
	startNsController bool,
109
	clusterLocal model.ClusterLocalProvider,
110
	s server.Instance,
111
	controller *multicluster.Controller,
112
) *Multicluster {
113
	mc := &Multicluster{
114
		serverID:               serverID,
115
		opts:                   opts,
116
		serviceEntryController: serviceEntryController,
117
		configController:       configController,
118
		startNsController:      startNsController,
119
		caBundleWatcher:        caBundleWatcher,
120
		revision:               revision,
121
		XDSUpdater:             opts.XDSUpdater,
122
		clusterLocal:           clusterLocal,
123
		secretNamespace:        secretNamespace,
124
		client:                 kc,
125
		s:                      s,
126
	}
127
	mc.component = multicluster.BuildMultiClusterComponent(controller, func(cluster *multicluster.Cluster) *kubeController {
128
		stop := make(chan struct{})
129
		client := cluster.Client
130
		configCluster := opts.ClusterID == cluster.ID
131

132
		options := opts
133
		options.ClusterID = cluster.ID
134
		if !configCluster {
135
			options.SyncTimeout = features.RemoteClusterTimeout
136
		}
137
		log.Infof("Initializing Kubernetes service registry %q", options.ClusterID)
138
		options.ConfigCluster = configCluster
139
		kubeRegistry := NewController(client, options)
140
		kubeController := &kubeController{
141
			MeshServiceController: opts.MeshServiceController,
142
			Controller:            kubeRegistry,
143
			stop:                  stop,
144
		}
145
		mc.initializeCluster(cluster, kubeController, kubeRegistry, options, configCluster, stop)
146
		return kubeController
147
	})
148

149
	return mc
150
}
151

152
// initializeCluster initializes the cluster by setting various handlers.
153
func (m *Multicluster) initializeCluster(cluster *multicluster.Cluster, kubeController *kubeController, kubeRegistry *Controller,
154
	options Options, configCluster bool, clusterStopCh <-chan struct{},
155
) {
156
	client := cluster.Client
157

158
	if m.serviceEntryController != nil && features.EnableServiceEntrySelectPods {
159
		// Add an instance handler in the kubernetes registry to notify service entry store about pod events
160
		kubeRegistry.AppendWorkloadHandler(m.serviceEntryController.WorkloadInstanceHandler)
161
	}
162

163
	if configCluster && m.serviceEntryController != nil && features.EnableEnhancedResourceScoping {
164
		kubeRegistry.AppendNamespaceDiscoveryHandlers(m.serviceEntryController.NamespaceDiscoveryHandler)
165
	}
166

167
	// TODO implement deduping in aggregate registry to allow multiple k8s registries to handle WorkloadEntry
168
	if features.EnableK8SServiceSelectWorkloadEntries {
169
		if m.serviceEntryController != nil && configCluster {
170
			// Add an instance handler in the service entry store to notify kubernetes about workload entry events
171
			m.serviceEntryController.AppendWorkloadHandler(kubeRegistry.WorkloadInstanceHandler)
172
		} else if features.WorkloadEntryCrossCluster {
173
			// TODO only do this for non-remotes, can't guarantee CRDs in remotes (depends on https://github.com/istio/istio/pull/29824)
174
			configStore := createWleConfigStore(client, m.revision, options)
175
			kubeController.workloadEntryController = serviceentry.NewWorkloadEntryController(
176
				configStore, options.XDSUpdater,
177
				serviceentry.WithClusterID(cluster.ID),
178
				serviceentry.WithNetworkIDCb(kubeRegistry.Network))
179
			// Services can select WorkloadEntry from the same cluster. We only duplicate the Service to configure kube-dns.
180
			kubeController.workloadEntryController.AppendWorkloadHandler(kubeRegistry.WorkloadInstanceHandler)
181
			// ServiceEntry selects WorkloadEntry from remote cluster
182
			kubeController.workloadEntryController.AppendWorkloadHandler(m.serviceEntryController.WorkloadInstanceHandler)
183
			if features.EnableEnhancedResourceScoping {
184
				kubeRegistry.AppendNamespaceDiscoveryHandlers(kubeController.workloadEntryController.NamespaceDiscoveryHandler)
185
			}
186
			m.opts.MeshServiceController.AddRegistryAndRun(kubeController.workloadEntryController, clusterStopCh)
187
			go configStore.Run(clusterStopCh)
188
		}
189
	}
190

191
	// run after WorkloadHandler is added
192
	m.opts.MeshServiceController.AddRegistryAndRun(kubeRegistry, clusterStopCh)
193

194
	go func() {
195
		var shouldLead bool
196
		if !configCluster {
197
			shouldLead = m.checkShouldLead(client, options.SystemNamespace, clusterStopCh)
198
			log.Infof("should join leader-election for cluster %s: %t", cluster.ID, shouldLead)
199
		}
200
		if m.startNsController && (shouldLead || configCluster) {
201
			// Block server exit on graceful termination of the leader controller.
202
			m.s.RunComponentAsyncAndWait("namespace controller", func(_ <-chan struct{}) error {
203
				log.Infof("joining leader-election for %s in %s on cluster %s",
204
					leaderelection.NamespaceController, options.SystemNamespace, options.ClusterID)
205
				election := leaderelection.
206
					NewLeaderElectionMulticluster(options.SystemNamespace, m.serverID, leaderelection.NamespaceController, m.revision, !configCluster, client).
207
					AddRunFunction(func(leaderStop <-chan struct{}) {
208
						log.Infof("starting namespace controller for cluster %s", cluster.ID)
209
						nc := NewNamespaceController(client, m.caBundleWatcher)
210
						// Start informers again. This fixes the case where informers for namespace do not start,
211
						// as we create them only after acquiring the leader lock
212
						// Note: stop here should be the overall pilot stop, NOT the leader election stop. We are
213
						// basically lazy loading the informer, if we stop it when we lose the lock we will never
214
						// recreate it again.
215
						client.RunAndWait(clusterStopCh)
216
						nc.Run(leaderStop)
217
					})
218
				election.Run(clusterStopCh)
219
				return nil
220
			})
221
		}
222
		// Set up injection webhook patching for remote clusters we are controlling.
223
		// The config cluster has this patching set up elsewhere. We may eventually want to move it here.
224
		// We can not use leader election for webhook patching because each revision needs to patch its own
225
		// webhook.
226
		if shouldLead && !configCluster && m.caBundleWatcher != nil {
227
			// Patch injection webhook cert
228
			// This requires RBAC permissions - a low-priv Istiod should not attempt to patch but rely on
229
			// operator or CI/CD
230
			if features.InjectionWebhookConfigName != "" {
231
				log.Infof("initializing injection webhook cert patcher for cluster %s", cluster.ID)
232
				patcher, err := webhooks.NewWebhookCertPatcher(client, m.revision, webhookName, m.caBundleWatcher)
233
				if err != nil {
234
					log.Errorf("could not initialize webhook cert patcher: %v", err)
235
				} else {
236
					go patcher.Run(clusterStopCh)
237
				}
238
			}
239
		}
240
	}()
241

242
	// setting up the serviceexport controller if and only if it is turned on in the meshconfig.
243
	if features.EnableMCSAutoExport {
244
		log.Infof("joining leader-election for %s in %s on cluster %s",
245
			leaderelection.ServiceExportController, options.SystemNamespace, options.ClusterID)
246
		// Block server exit on graceful termination of the leader controller.
247
		m.s.RunComponentAsyncAndWait("auto serviceexport controller", func(_ <-chan struct{}) error {
248
			leaderelection.
249
				NewLeaderElectionMulticluster(options.SystemNamespace, m.serverID, leaderelection.ServiceExportController, m.revision, !configCluster, client).
250
				AddRunFunction(func(leaderStop <-chan struct{}) {
251
					serviceExportController := newAutoServiceExportController(autoServiceExportOptions{
252
						Client:       client,
253
						ClusterID:    options.ClusterID,
254
						DomainSuffix: options.DomainSuffix,
255
						ClusterLocal: m.clusterLocal,
256
					})
257
					// Start informers again. This fixes the case where informers do not start,
258
					// as we create them only after acquiring the leader lock
259
					// Note: stop here should be the overall pilot stop, NOT the leader election stop. We are
260
					// basically lazy loading the informer, if we stop it when we lose the lock we will never
261
					// recreate it again.
262
					client.RunAndWait(clusterStopCh)
263
					serviceExportController.Run(leaderStop)
264
				}).Run(clusterStopCh)
265
			return nil
266
		})
267
	}
268
}
269

270
// checkShouldLead returns true if the caller should attempt leader election for a remote cluster.
271
func (m *Multicluster) checkShouldLead(client kubelib.Client, systemNamespace string, stop <-chan struct{}) bool {
272
	var res bool
273
	if features.ExternalIstiod {
274
		b := backoff.NewExponentialBackOff(backoff.DefaultOption())
275
		ctx, cancel := context.WithCancel(context.Background())
276
		go func() {
277
			select {
278
			case <-stop:
279
				cancel()
280
			case <-ctx.Done():
281
			}
282
		}()
283
		defer cancel()
284
		_ = b.RetryWithContext(ctx, func() error {
285
			namespace, err := client.Kube().CoreV1().Namespaces().Get(context.TODO(), systemNamespace, metav1.GetOptions{})
286
			if err != nil {
287
				if errors.IsNotFound(err) {
288
					return nil
289
				}
290
				return err
291
			}
292
			// found same system namespace on the remote cluster so check if we are a selected istiod to lead
293
			istiodCluster, found := namespace.Annotations[annotation.TopologyControlPlaneClusters.Name]
294
			if found {
295
				localCluster := string(m.opts.ClusterID)
296
				for _, cluster := range strings.Split(istiodCluster, ",") {
297
					if cluster == "*" || cluster == localCluster {
298
						res = true
299
						return nil
300
					}
301
				}
302
			}
303
			return nil
304
		})
305
	}
306
	return res
307
}
308

309
func createWleConfigStore(client kubelib.Client, revision string, opts Options) model.ConfigStoreController {
310
	log.Infof("Creating WorkloadEntry only config store for %s", opts.ClusterID)
311
	workloadEntriesSchemas := collection.NewSchemasBuilder().
312
		MustAdd(collections.WorkloadEntry).
313
		Build()
314
	crdOpts := crdclient.Option{Revision: revision, DomainSuffix: opts.DomainSuffix, Identifier: "mc-workload-entry-controller"}
315
	return crdclient.NewForSchemas(client, crdOpts, workloadEntriesSchemas)
316
}
317

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.