istio
316 строк · 12.6 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package controller16
17import (18"context"19"strings"20
21"k8s.io/apimachinery/pkg/api/errors"22metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"23"k8s.io/client-go/kubernetes"24
25"istio.io/api/annotation"26"istio.io/istio/pilot/pkg/config/kube/crdclient"27"istio.io/istio/pilot/pkg/features"28"istio.io/istio/pilot/pkg/keycertbundle"29"istio.io/istio/pilot/pkg/leaderelection"30"istio.io/istio/pilot/pkg/model"31"istio.io/istio/pilot/pkg/server"32"istio.io/istio/pilot/pkg/serviceregistry/aggregate"33"istio.io/istio/pilot/pkg/serviceregistry/provider"34"istio.io/istio/pilot/pkg/serviceregistry/serviceentry"35"istio.io/istio/pkg/backoff"36"istio.io/istio/pkg/config/schema/collection"37"istio.io/istio/pkg/config/schema/collections"38kubelib "istio.io/istio/pkg/kube"39"istio.io/istio/pkg/kube/multicluster"40"istio.io/istio/pkg/webhooks"41)
42
43const (44// Name of the webhook config in the config - no need to change it.45webhookName = "sidecar-injector.istio.io"46)
47
48type kubeController struct {49MeshServiceController *aggregate.Controller50*Controller51workloadEntryController *serviceentry.Controller52stop chan struct{}53}
54
55func (k *kubeController) Close() {56close(k.stop)57clusterID := k.Controller.clusterID58k.MeshServiceController.UnRegisterHandlersForCluster(clusterID)59k.MeshServiceController.DeleteRegistry(clusterID, provider.Kubernetes)60if k.workloadEntryController != nil {61k.MeshServiceController.DeleteRegistry(clusterID, provider.External)62}63if err := k.Controller.Cleanup(); err != nil {64log.Warnf("failed cleaning up services in %s: %v", clusterID, err)65}66if k.opts.XDSUpdater != nil {67k.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{Full: true, Reason: model.NewReasonStats(model.ClusterUpdate)})68}69}
70
71// Multicluster structure holds the remote kube Controllers and multicluster specific attributes.
72type Multicluster struct {73// serverID of this pilot instance used for leader election74serverID string75
76// options to use when creating kube controllers77opts Options
78
79// client for reading remote-secrets to initialize multicluster registries80client kubernetes.Interface81s server.Instance82
83serviceEntryController *serviceentry.Controller84configController model.ConfigStoreController85XDSUpdater model.XDSUpdater86
87clusterLocal model.ClusterLocalProvider88
89startNsController bool90caBundleWatcher *keycertbundle.Watcher91revision string92
93// secretNamespace where we get cluster-access secrets94secretNamespace string95component *multicluster.Component[*kubeController]96}
97
98// NewMulticluster initializes data structure to store multicluster information
99func NewMulticluster(100serverID string,101kc kubernetes.Interface,102secretNamespace string,103opts Options,104serviceEntryController *serviceentry.Controller,105configController model.ConfigStoreController,106caBundleWatcher *keycertbundle.Watcher,107revision string,108startNsController bool,109clusterLocal model.ClusterLocalProvider,110s server.Instance,111controller *multicluster.Controller,112) *Multicluster {113mc := &Multicluster{114serverID: serverID,115opts: opts,116serviceEntryController: serviceEntryController,117configController: configController,118startNsController: startNsController,119caBundleWatcher: caBundleWatcher,120revision: revision,121XDSUpdater: opts.XDSUpdater,122clusterLocal: clusterLocal,123secretNamespace: secretNamespace,124client: kc,125s: s,126}127mc.component = multicluster.BuildMultiClusterComponent(controller, func(cluster *multicluster.Cluster) *kubeController {128stop := make(chan struct{})129client := cluster.Client130configCluster := opts.ClusterID == cluster.ID131
132options := opts133options.ClusterID = cluster.ID134if !configCluster {135options.SyncTimeout = features.RemoteClusterTimeout136}137log.Infof("Initializing Kubernetes service registry %q", options.ClusterID)138options.ConfigCluster = configCluster139kubeRegistry := NewController(client, options)140kubeController := &kubeController{141MeshServiceController: opts.MeshServiceController,142Controller: kubeRegistry,143stop: stop,144}145mc.initializeCluster(cluster, kubeController, kubeRegistry, options, configCluster, stop)146return kubeController147})148
149return mc150}
151
152// initializeCluster initializes the cluster by setting various handlers.
153func (m *Multicluster) initializeCluster(cluster *multicluster.Cluster, kubeController *kubeController, kubeRegistry *Controller,154options Options, configCluster bool, clusterStopCh <-chan struct{},155) {156client := cluster.Client157
158if m.serviceEntryController != nil && features.EnableServiceEntrySelectPods {159// Add an instance handler in the kubernetes registry to notify service entry store about pod events160kubeRegistry.AppendWorkloadHandler(m.serviceEntryController.WorkloadInstanceHandler)161}162
163if configCluster && m.serviceEntryController != nil && features.EnableEnhancedResourceScoping {164kubeRegistry.AppendNamespaceDiscoveryHandlers(m.serviceEntryController.NamespaceDiscoveryHandler)165}166
167// TODO implement deduping in aggregate registry to allow multiple k8s registries to handle WorkloadEntry168if features.EnableK8SServiceSelectWorkloadEntries {169if m.serviceEntryController != nil && configCluster {170// Add an instance handler in the service entry store to notify kubernetes about workload entry events171m.serviceEntryController.AppendWorkloadHandler(kubeRegistry.WorkloadInstanceHandler)172} else if features.WorkloadEntryCrossCluster {173// TODO only do this for non-remotes, can't guarantee CRDs in remotes (depends on https://github.com/istio/istio/pull/29824)174configStore := createWleConfigStore(client, m.revision, options)175kubeController.workloadEntryController = serviceentry.NewWorkloadEntryController(176configStore, options.XDSUpdater,177serviceentry.WithClusterID(cluster.ID),178serviceentry.WithNetworkIDCb(kubeRegistry.Network))179// Services can select WorkloadEntry from the same cluster. We only duplicate the Service to configure kube-dns.180kubeController.workloadEntryController.AppendWorkloadHandler(kubeRegistry.WorkloadInstanceHandler)181// ServiceEntry selects WorkloadEntry from remote cluster182kubeController.workloadEntryController.AppendWorkloadHandler(m.serviceEntryController.WorkloadInstanceHandler)183if features.EnableEnhancedResourceScoping {184kubeRegistry.AppendNamespaceDiscoveryHandlers(kubeController.workloadEntryController.NamespaceDiscoveryHandler)185}186m.opts.MeshServiceController.AddRegistryAndRun(kubeController.workloadEntryController, clusterStopCh)187go configStore.Run(clusterStopCh)188}189}190
191// run after WorkloadHandler is added192m.opts.MeshServiceController.AddRegistryAndRun(kubeRegistry, clusterStopCh)193
194go func() {195var shouldLead bool196if !configCluster {197shouldLead = m.checkShouldLead(client, options.SystemNamespace, clusterStopCh)198log.Infof("should join leader-election for cluster %s: %t", cluster.ID, shouldLead)199}200if m.startNsController && (shouldLead || configCluster) {201// Block server exit on graceful termination of the leader controller.202m.s.RunComponentAsyncAndWait("namespace controller", func(_ <-chan struct{}) error {203log.Infof("joining leader-election for %s in %s on cluster %s",204leaderelection.NamespaceController, options.SystemNamespace, options.ClusterID)205election := leaderelection.206NewLeaderElectionMulticluster(options.SystemNamespace, m.serverID, leaderelection.NamespaceController, m.revision, !configCluster, client).207AddRunFunction(func(leaderStop <-chan struct{}) {208log.Infof("starting namespace controller for cluster %s", cluster.ID)209nc := NewNamespaceController(client, m.caBundleWatcher)210// Start informers again. This fixes the case where informers for namespace do not start,211// as we create them only after acquiring the leader lock212// Note: stop here should be the overall pilot stop, NOT the leader election stop. We are213// basically lazy loading the informer, if we stop it when we lose the lock we will never214// recreate it again.215client.RunAndWait(clusterStopCh)216nc.Run(leaderStop)217})218election.Run(clusterStopCh)219return nil220})221}222// Set up injection webhook patching for remote clusters we are controlling.223// The config cluster has this patching set up elsewhere. We may eventually want to move it here.224// We can not use leader election for webhook patching because each revision needs to patch its own225// webhook.226if shouldLead && !configCluster && m.caBundleWatcher != nil {227// Patch injection webhook cert228// This requires RBAC permissions - a low-priv Istiod should not attempt to patch but rely on229// operator or CI/CD230if features.InjectionWebhookConfigName != "" {231log.Infof("initializing injection webhook cert patcher for cluster %s", cluster.ID)232patcher, err := webhooks.NewWebhookCertPatcher(client, m.revision, webhookName, m.caBundleWatcher)233if err != nil {234log.Errorf("could not initialize webhook cert patcher: %v", err)235} else {236go patcher.Run(clusterStopCh)237}238}239}240}()241
242// setting up the serviceexport controller if and only if it is turned on in the meshconfig.243if features.EnableMCSAutoExport {244log.Infof("joining leader-election for %s in %s on cluster %s",245leaderelection.ServiceExportController, options.SystemNamespace, options.ClusterID)246// Block server exit on graceful termination of the leader controller.247m.s.RunComponentAsyncAndWait("auto serviceexport controller", func(_ <-chan struct{}) error {248leaderelection.249NewLeaderElectionMulticluster(options.SystemNamespace, m.serverID, leaderelection.ServiceExportController, m.revision, !configCluster, client).250AddRunFunction(func(leaderStop <-chan struct{}) {251serviceExportController := newAutoServiceExportController(autoServiceExportOptions{252Client: client,253ClusterID: options.ClusterID,254DomainSuffix: options.DomainSuffix,255ClusterLocal: m.clusterLocal,256})257// Start informers again. This fixes the case where informers do not start,258// as we create them only after acquiring the leader lock259// Note: stop here should be the overall pilot stop, NOT the leader election stop. We are260// basically lazy loading the informer, if we stop it when we lose the lock we will never261// recreate it again.262client.RunAndWait(clusterStopCh)263serviceExportController.Run(leaderStop)264}).Run(clusterStopCh)265return nil266})267}268}
269
270// checkShouldLead returns true if the caller should attempt leader election for a remote cluster.
271func (m *Multicluster) checkShouldLead(client kubelib.Client, systemNamespace string, stop <-chan struct{}) bool {272var res bool273if features.ExternalIstiod {274b := backoff.NewExponentialBackOff(backoff.DefaultOption())275ctx, cancel := context.WithCancel(context.Background())276go func() {277select {278case <-stop:279cancel()280case <-ctx.Done():281}282}()283defer cancel()284_ = b.RetryWithContext(ctx, func() error {285namespace, err := client.Kube().CoreV1().Namespaces().Get(context.TODO(), systemNamespace, metav1.GetOptions{})286if err != nil {287if errors.IsNotFound(err) {288return nil289}290return err291}292// found same system namespace on the remote cluster so check if we are a selected istiod to lead293istiodCluster, found := namespace.Annotations[annotation.TopologyControlPlaneClusters.Name]294if found {295localCluster := string(m.opts.ClusterID)296for _, cluster := range strings.Split(istiodCluster, ",") {297if cluster == "*" || cluster == localCluster {298res = true299return nil300}301}302}303return nil304})305}306return res307}
308
309func createWleConfigStore(client kubelib.Client, revision string, opts Options) model.ConfigStoreController {310log.Infof("Creating WorkloadEntry only config store for %s", opts.ClusterID)311workloadEntriesSchemas := collection.NewSchemasBuilder().312MustAdd(collections.WorkloadEntry).313Build()314crdOpts := crdclient.Option{Revision: revision, DomainSuffix: opts.DomainSuffix, Identifier: "mc-workload-entry-controller"}315return crdclient.NewForSchemas(client, crdOpts, workloadEntriesSchemas)316}
317