istio
315 строк · 10.2 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package controller
16
17import (
18"sort"
19"strings"
20
21metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
23klabels "k8s.io/apimachinery/pkg/labels"
24"k8s.io/apimachinery/pkg/types"
25
26"istio.io/istio/pilot/pkg/features"
27"istio.io/istio/pilot/pkg/model"
28"istio.io/istio/pilot/pkg/serviceregistry/kube"
29"istio.io/istio/pkg/cluster"
30"istio.io/istio/pkg/config"
31"istio.io/istio/pkg/config/constants"
32"istio.io/istio/pkg/config/host"
33"istio.io/istio/pkg/config/schema/kind"
34"istio.io/istio/pkg/kube/controllers"
35"istio.io/istio/pkg/kube/kclient"
36"istio.io/istio/pkg/kube/kubetypes"
37"istio.io/istio/pkg/kube/mcs"
38"istio.io/istio/pkg/slices"
39netutil "istio.io/istio/pkg/util/net"
40"istio.io/istio/pkg/util/sets"
41)
42
43const (
44mcsDomainSuffix = "." + constants.DefaultClusterSetLocalDomain
45)
46
47type importedService struct {
48namespacedName types.NamespacedName
49clusterSetVIP string
50}
51
52// serviceImportCache reads and processes Kubernetes Multi-Cluster Services (MCS) ServiceImport
53// resources.
54//
55// An MCS controller is responsible for reading ServiceExport resources in one cluster and generating
56// ServiceImport in all clusters of the ClusterSet (i.e. mesh). While the serviceExportCache reads
57// ServiceExport to control the discoverability policy for individual endpoints, this controller
58// reads ServiceImport in the cluster in order to extract the ClusterSet VIP and generate a
59// synthetic service for the MCS host (i.e. clusterset.local). The aggregate.Controller will then
60// merge together the MCS services from all the clusters, filling out the full map of Cluster IPs.
61//
62// The synthetic MCS service is a copy of the real k8s Service (e.g. cluster.local) with the same
63// namespaced name, but with the hostname and VIPs changed to the appropriate ClusterSet values.
64// The real k8s Service can live anywhere in the mesh and does not have to reside in the same
65// cluster as the ServiceImport.
66type serviceImportCache interface {
67Run(stop <-chan struct{})
68HasSynced() bool
69ImportedServices() []importedService
70}
71
72// newServiceImportCache creates a new cache of ServiceImport resources in the cluster.
73func newServiceImportCache(c *Controller) serviceImportCache {
74if features.EnableMCSHost {
75sic := &serviceImportCacheImpl{
76Controller: c,
77}
78
79sic.serviceImports = kclient.NewDelayedInformer[controllers.Object](sic.client, mcs.ServiceImportGVR, kubetypes.DynamicInformer, kclient.Filter{
80ObjectFilter: sic.client.ObjectFilter(),
81})
82// Register callbacks for events.
83registerHandlers(sic.Controller, sic.serviceImports, "ServiceImports", sic.onServiceImportEvent, nil)
84sic.opts.MeshServiceController.AppendServiceHandlerForCluster(sic.Cluster(), sic.onServiceEvent)
85
86return sic
87}
88
89// MCS Service discovery is disabled. Use a placeholder cache.
90return disabledServiceImportCache{}
91}
92
93// serviceImportCacheImpl reads ServiceImport resources for a single cluster.
94type serviceImportCacheImpl struct {
95*Controller
96
97serviceImports kclient.Untyped
98}
99
100// onServiceEvent is called when the controller receives an event for the kube Service (i.e. cluster.local).
101// When this happens, we need to update the state of the associated synthetic MCS service.
102func (ic *serviceImportCacheImpl) onServiceEvent(_, curr *model.Service, event model.Event) {
103if strings.HasSuffix(curr.Hostname.String(), mcsDomainSuffix) {
104// Ignore events for MCS services that were triggered by this controller.
105return
106}
107
108// This method is called concurrently from each cluster's queue. Process it in `this` cluster's queue
109// in order to synchronize event processing.
110ic.queue.Push(func() error {
111namespacedName := namespacedNameForService(curr)
112
113// Lookup the previous MCS service if there was one.
114mcsHost := serviceClusterSetLocalHostname(namespacedName)
115prevMcsService := ic.GetService(mcsHost)
116
117// Get the ClusterSet VIPs for this service in this cluster. Will only be populated if the
118// service has a ServiceImport in this cluster.
119vips := ic.getClusterSetIPs(namespacedName)
120name := namespacedName.Name
121ns := namespacedName.Namespace
122
123if len(vips) == 0 || (event == model.EventDelete &&
124ic.opts.MeshServiceController.GetService(kube.ServiceHostname(name, ns, ic.opts.DomainSuffix)) == nil) {
125if prevMcsService != nil {
126// There are no vips in this cluster. Just delete the MCS service now.
127ic.deleteService(prevMcsService)
128}
129return nil
130}
131
132if prevMcsService != nil {
133event = model.EventUpdate
134} else {
135event = model.EventAdd
136}
137
138mcsService := ic.genMCSService(curr, mcsHost, vips)
139ic.addOrUpdateService(nil, nil, mcsService, event, false)
140return nil
141})
142}
143
144func (ic *serviceImportCacheImpl) onServiceImportEvent(_, obj controllers.Object, event model.Event) error {
145si := controllers.Extract[*unstructured.Unstructured](obj)
146if si == nil {
147return nil
148}
149
150// We need a full push if the cluster VIP changes.
151needsFullPush := false
152
153// Get the updated MCS service.
154mcsHost := serviceClusterSetLocalHostnameForKR(si)
155mcsService := ic.GetService(mcsHost)
156
157ips := GetServiceImportIPs(si)
158if mcsService == nil {
159if event == model.EventDelete || len(ips) == 0 {
160// We never created the service. Nothing to delete.
161return nil
162}
163
164// The service didn't exist prior. Treat it as an add.
165event = model.EventAdd
166
167// Create the MCS service, based on the cluster.local service. We get the merged, mesh-wide service
168// from the aggregate controller so that we don't rely on the service existing in this cluster.
169realService := ic.opts.MeshServiceController.GetService(kube.ServiceHostnameForKR(si, ic.opts.DomainSuffix))
170if realService == nil {
171log.Warnf("failed processing %s event for ServiceImport %s/%s in cluster %s. No matching service found in cluster",
172event, si.GetNamespace(), si.GetName(), ic.Cluster())
173return nil
174}
175
176// Create the MCS service from the cluster.local service.
177mcsService = ic.genMCSService(realService, mcsHost, ips)
178} else {
179if event == model.EventDelete || len(ips) == 0 {
180ic.deleteService(mcsService)
181return nil
182}
183
184// The service already existed. Treat it as an update.
185event = model.EventUpdate
186mcsService = mcsService.DeepCopy()
187if ic.updateIPs(mcsService, ips) {
188needsFullPush = true
189}
190}
191
192// Always force a rebuild of the endpoint cache in case this import caused
193// a change to the discoverability policy.
194ic.addOrUpdateService(nil, nil, mcsService, event, true)
195
196// TODO: do we really need a full push, we should do it in `addOrUpdateService`.
197if needsFullPush {
198ic.doFullPush(mcsHost, si.GetNamespace())
199}
200
201return nil
202}
203
204func (ic *serviceImportCacheImpl) updateIPs(mcsService *model.Service, ips []string) (updated bool) {
205prevIPs := mcsService.ClusterVIPs.GetAddressesFor(ic.Cluster())
206if !slices.Equal(prevIPs, ips) {
207// Update the VIPs
208mcsService.ClusterVIPs.SetAddressesFor(ic.Cluster(), ips)
209updated = true
210}
211return
212}
213
214func (ic *serviceImportCacheImpl) doFullPush(mcsHost host.Name, ns string) {
215pushReq := &model.PushRequest{
216Full: true,
217ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.ServiceEntry, Name: mcsHost.String(), Namespace: ns}),
218
219Reason: model.NewReasonStats(model.ServiceUpdate),
220}
221ic.opts.XDSUpdater.ConfigUpdate(pushReq)
222}
223
224// GetServiceImportIPs returns the list of ClusterSet IPs for the ServiceImport.
225// Exported for testing only.
226func GetServiceImportIPs(si *unstructured.Unstructured) []string {
227var ips []string
228if spec, ok := si.Object["spec"].(map[string]any); ok {
229if rawIPs, ok := spec["ips"].([]any); ok {
230for _, rawIP := range rawIPs {
231ip := rawIP.(string)
232if netutil.IsValidIPAddress(ip) {
233ips = append(ips, ip)
234}
235}
236}
237}
238sort.Strings(ips)
239return ips
240}
241
242// genMCSService generates an MCS service based on the given real k8s service. The list of vips must be non-empty.
243func (ic *serviceImportCacheImpl) genMCSService(realService *model.Service, mcsHost host.Name, vips []string) *model.Service {
244mcsService := realService.DeepCopy()
245mcsService.Hostname = mcsHost
246mcsService.DefaultAddress = vips[0]
247mcsService.ClusterVIPs.Addresses = map[cluster.ID][]string{
248ic.Cluster(): vips,
249}
250
251return mcsService
252}
253
254func (ic *serviceImportCacheImpl) getClusterSetIPs(name types.NamespacedName) []string {
255si := ic.serviceImports.Get(name.Name, name.Namespace)
256if si != nil {
257return GetServiceImportIPs(si.(*unstructured.Unstructured))
258}
259return nil
260}
261
262func (ic *serviceImportCacheImpl) ImportedServices() []importedService {
263sis := ic.serviceImports.List(metav1.NamespaceAll, klabels.Everything())
264
265// Iterate over the ServiceImport resources in this cluster.
266out := make([]importedService, 0, len(sis))
267
268ic.RLock()
269for _, si := range sis {
270usi := si.(*unstructured.Unstructured)
271info := importedService{
272namespacedName: config.NamespacedName(usi),
273}
274
275// Lookup the synthetic MCS service.
276hostName := serviceClusterSetLocalHostnameForKR(usi)
277svc := ic.servicesMap[hostName]
278if svc != nil {
279if vips := svc.ClusterVIPs.GetAddressesFor(ic.Cluster()); len(vips) > 0 {
280info.clusterSetVIP = vips[0]
281}
282}
283
284out = append(out, info)
285}
286ic.RUnlock()
287
288return out
289}
290
291func (ic *serviceImportCacheImpl) Run(stop <-chan struct{}) {
292}
293
294func (ic *serviceImportCacheImpl) HasSynced() bool {
295return ic.serviceImports.HasSynced()
296}
297
298type disabledServiceImportCache struct{}
299
300var _ serviceImportCache = disabledServiceImportCache{}
301
302func (c disabledServiceImportCache) Run(stop <-chan struct{}) {}
303
304func (c disabledServiceImportCache) HasSynced() bool {
305return true
306}
307
308func (c disabledServiceImportCache) ImportedServices() []importedService {
309// MCS is disabled - returning `nil`, which is semantically different here than an empty list.
310return nil
311}
312
313func (c disabledServiceImportCache) HasCRDInstalled() bool {
314return false
315}
316