istio

Форк
0
433 строки · 12.4 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package memory
16

17
import (
18
	"fmt"
19
	"net/netip"
20
	"sync"
21

22
	"istio.io/istio/pilot/pkg/model"
23
	"istio.io/istio/pilot/pkg/serviceregistry/provider"
24
	"istio.io/istio/pkg/cluster"
25
	"istio.io/istio/pkg/config/host"
26
	"istio.io/istio/pkg/config/labels"
27
	"istio.io/istio/pkg/config/protocol"
28
	"istio.io/istio/pkg/log"
29
	"istio.io/istio/pkg/maps"
30
	"istio.io/istio/pkg/slices"
31
	"istio.io/istio/pkg/util/sets"
32
	"istio.io/istio/pkg/workloadapi"
33
)
34

35
// ServiceDiscovery is a mock discovery interface
36
type ServiceDiscovery struct {
37
	services map[host.Name]*model.Service
38

39
	handlers model.ControllerHandlers
40

41
	networkGateways []model.NetworkGateway
42
	model.NetworkGatewaysHandler
43

44
	// EndpointShards table. Key is the fqdn of the service, ':', port
45
	instancesByPortNum  map[string][]*model.ServiceInstance
46
	instancesByPortName map[string][]*model.ServiceInstance
47

48
	// Used by GetProxyServiceInstance, used to configure inbound (list of services per IP)
49
	// We generally expect a single instance - conflicting services need to be reported.
50
	ip2instance                map[string][]*model.ServiceInstance
51
	WantGetProxyServiceTargets []model.ServiceTarget
52
	InstancesError             error
53
	Controller                 model.Controller
54
	ClusterID                  cluster.ID
55

56
	// Used by GetProxyWorkloadLabels
57
	ip2workloadLabels map[string]labels.Instance
58

59
	addresses map[string]model.AddressInfo
60

61
	// XDSUpdater will push EDS changes to the ADS model.
62
	XdsUpdater model.XDSUpdater
63

64
	// Single mutex for now - it's for debug only.
65
	mutex sync.Mutex
66
}
67

68
var (
69
	_ model.Controller       = &ServiceDiscovery{}
70
	_ model.ServiceDiscovery = &ServiceDiscovery{}
71
)
72

73
// NewServiceDiscovery builds an in-memory ServiceDiscovery
74
func NewServiceDiscovery(services ...*model.Service) *ServiceDiscovery {
75
	svcs := map[host.Name]*model.Service{}
76
	for _, svc := range services {
77
		svcs[svc.Hostname] = svc
78
	}
79
	return &ServiceDiscovery{
80
		services:            svcs,
81
		instancesByPortNum:  map[string][]*model.ServiceInstance{},
82
		instancesByPortName: map[string][]*model.ServiceInstance{},
83
		ip2instance:         map[string][]*model.ServiceInstance{},
84
		ip2workloadLabels:   map[string]labels.Instance{},
85
		addresses:           map[string]model.AddressInfo{},
86
	}
87
}
88

89
func (sd *ServiceDiscovery) shardKey() model.ShardKey {
90
	return model.ShardKey{Cluster: sd.ClusterID, Provider: provider.Mock}
91
}
92

93
func (sd *ServiceDiscovery) AddWorkload(ip string, labels labels.Instance) {
94
	sd.ip2workloadLabels[ip] = labels
95
}
96

97
// AddHTTPService is a helper to add a service of type http, named 'http-main', with the
98
// specified vip and port.
99
func (sd *ServiceDiscovery) AddHTTPService(name, vip string, port int) {
100
	sd.AddService(&model.Service{
101
		Hostname:       host.Name(name),
102
		DefaultAddress: vip,
103
		Ports: model.PortList{
104
			{
105
				Name:     "http-main",
106
				Port:     port,
107
				Protocol: protocol.HTTP,
108
			},
109
		},
110
	})
111
}
112

113
// AddService adds an in-memory service and notifies
114
func (sd *ServiceDiscovery) AddService(svc *model.Service) {
115
	sd.mutex.Lock()
116
	svc.Attributes.ServiceRegistry = provider.Mock
117
	var old *model.Service
118
	event := model.EventAdd
119
	if o, f := sd.services[svc.Hostname]; f {
120
		old = o
121
		event = model.EventUpdate
122
	}
123
	sd.services[svc.Hostname] = svc
124

125
	if sd.XdsUpdater != nil {
126
		sd.XdsUpdater.SvcUpdate(sd.shardKey(), string(svc.Hostname), svc.Attributes.Namespace, model.EventAdd)
127
	}
128
	sd.handlers.NotifyServiceHandlers(old, svc, event)
129
	sd.mutex.Unlock()
130
}
131

132
// RemoveService removes an in-memory service.
133
func (sd *ServiceDiscovery) RemoveService(name host.Name) {
134
	sd.mutex.Lock()
135
	svc := sd.services[name]
136
	delete(sd.services, name)
137

138
	// remove old entries
139
	for k, v := range sd.ip2instance {
140
		sd.ip2instance[k] = slices.FilterInPlace(v, func(instance *model.ServiceInstance) bool {
141
			return instance.Service == nil || instance.Service.Hostname != name
142
		})
143
	}
144

145
	if sd.XdsUpdater != nil {
146
		sd.XdsUpdater.SvcUpdate(sd.shardKey(), string(svc.Hostname), svc.Attributes.Namespace, model.EventDelete)
147
	}
148
	sd.handlers.NotifyServiceHandlers(nil, svc, model.EventDelete)
149
	sd.mutex.Unlock()
150
}
151

152
// AddInstance adds an in-memory instance and notifies the XDS updater
153
func (sd *ServiceDiscovery) AddInstance(instance *model.ServiceInstance) {
154
	sd.mutex.Lock()
155
	defer sd.mutex.Unlock()
156
	service := instance.Service.Hostname
157
	svc := sd.services[service]
158
	if svc == nil {
159
		return
160
	}
161
	if instance.Endpoint.ServicePortName == "" {
162
		instance.Endpoint.ServicePortName = instance.ServicePort.Name
163
	}
164
	instance.Service = svc
165
	sd.ip2instance[instance.Endpoint.Address] = append(sd.ip2instance[instance.Endpoint.Address], instance)
166

167
	key := fmt.Sprintf("%s:%d", service, instance.ServicePort.Port)
168
	instanceList := sd.instancesByPortNum[key]
169
	sd.instancesByPortNum[key] = append(instanceList, instance)
170

171
	key = fmt.Sprintf("%s:%s", service, instance.ServicePort.Name)
172
	instanceList = sd.instancesByPortName[key]
173
	sd.instancesByPortName[key] = append(instanceList, instance)
174
	eps := make([]*model.IstioEndpoint, 0, len(sd.instancesByPortName[key]))
175
	for _, port := range svc.Ports {
176
		key := fmt.Sprintf("%s:%s", service, port.Name)
177
		for _, i := range sd.instancesByPortName[key] {
178
			eps = append(eps, i.Endpoint)
179
		}
180
	}
181
	if sd.XdsUpdater != nil {
182
		sd.XdsUpdater.EDSUpdate(sd.shardKey(), string(service), svc.Attributes.Namespace, eps)
183
	}
184
}
185

186
// AddEndpoint adds an endpoint to a service.
187
func (sd *ServiceDiscovery) AddEndpoint(service host.Name, servicePortName string, servicePort int, address string, port int) *model.ServiceInstance {
188
	instance := &model.ServiceInstance{
189
		Service: &model.Service{Hostname: service},
190
		Endpoint: &model.IstioEndpoint{
191
			Address:         address,
192
			ServicePortName: servicePortName,
193
			EndpointPort:    uint32(port),
194
		},
195
		ServicePort: &model.Port{
196
			Name:     servicePortName,
197
			Port:     servicePort,
198
			Protocol: protocol.HTTP,
199
		},
200
	}
201
	sd.AddInstance(instance)
202
	return instance
203
}
204

205
// SetEndpoints update the list of endpoints for a service, similar with K8S controller.
206
func (sd *ServiceDiscovery) SetEndpoints(service string, namespace string, endpoints []*model.IstioEndpoint) {
207
	sh := host.Name(service)
208

209
	sd.mutex.Lock()
210
	svc := sd.services[sh]
211
	if svc == nil {
212
		sd.mutex.Unlock()
213
		return
214
	}
215
	if svc.Attributes.Namespace != namespace {
216
		log.Errorf("Service namespace %q != namespace %q", svc.Attributes.Namespace, namespace)
217
	}
218

219
	// remove old entries
220
	for k, v := range sd.ip2instance {
221
		if len(v) > 0 && v[0].Service.Hostname == sh {
222
			delete(sd.ip2instance, k)
223
		}
224
	}
225
	for k, v := range sd.instancesByPortNum {
226
		if len(v) > 0 && v[0].Service.Hostname == sh {
227
			delete(sd.instancesByPortNum, k)
228
		}
229
	}
230
	for k, v := range sd.instancesByPortName {
231
		if len(v) > 0 && v[0].Service.Hostname == sh {
232
			delete(sd.instancesByPortName, k)
233
		}
234
	}
235

236
	for _, e := range endpoints {
237
		// servicePortName string, servicePort int, address string, port int
238
		p, _ := svc.Ports.Get(e.ServicePortName)
239

240
		instance := &model.ServiceInstance{
241
			Service: svc,
242
			ServicePort: &model.Port{
243
				Name:     e.ServicePortName,
244
				Port:     p.Port,
245
				Protocol: p.Protocol,
246
			},
247
			Endpoint: e,
248
		}
249
		sd.ip2instance[instance.Endpoint.Address] = []*model.ServiceInstance{instance}
250

251
		key := fmt.Sprintf("%s:%d", service, instance.ServicePort.Port)
252

253
		instanceList := sd.instancesByPortNum[key]
254
		sd.instancesByPortNum[key] = append(instanceList, instance)
255

256
		key = fmt.Sprintf("%s:%s", service, instance.ServicePort.Name)
257
		instanceList = sd.instancesByPortName[key]
258
		sd.instancesByPortName[key] = append(instanceList, instance)
259

260
	}
261
	if sd.XdsUpdater != nil {
262
		sd.XdsUpdater.EDSUpdate(sd.shardKey(), service, namespace, endpoints)
263
	}
264
	sd.mutex.Unlock()
265
}
266

267
// Services implements discovery interface
268
// Each call to Services() should return a list of new *model.Service
269
func (sd *ServiceDiscovery) Services() []*model.Service {
270
	sd.mutex.Lock()
271
	defer sd.mutex.Unlock()
272
	out := make([]*model.Service, 0, len(sd.services))
273
	for _, service := range sd.services {
274
		out = append(out, service)
275
	}
276
	return out
277
}
278

279
// GetService implements discovery interface
280
// Each call to GetService() should return a new *model.Service
281
func (sd *ServiceDiscovery) GetService(hostname host.Name) *model.Service {
282
	sd.mutex.Lock()
283
	defer sd.mutex.Unlock()
284
	return sd.services[hostname]
285
}
286

287
// GetProxyServiceTargets returns service instances associated with a node, resulting in
288
// 'in' services.
289
func (sd *ServiceDiscovery) GetProxyServiceTargets(node *model.Proxy) []model.ServiceTarget {
290
	sd.mutex.Lock()
291
	defer sd.mutex.Unlock()
292
	if sd.WantGetProxyServiceTargets != nil {
293
		return sd.WantGetProxyServiceTargets
294
	}
295
	out := make([]model.ServiceTarget, 0)
296
	for _, ip := range node.IPAddresses {
297
		si, found := sd.ip2instance[ip]
298
		if found {
299
			out = append(out, slices.Map(si, model.ServiceInstanceToTarget)...)
300
		}
301
	}
302
	return out
303
}
304

305
func (sd *ServiceDiscovery) GetProxyWorkloadLabels(proxy *model.Proxy) labels.Instance {
306
	sd.mutex.Lock()
307
	defer sd.mutex.Unlock()
308

309
	for _, ip := range proxy.IPAddresses {
310
		if l, found := sd.ip2workloadLabels[ip]; found {
311
			return l
312
		}
313
	}
314
	return nil
315
}
316

317
func (sd *ServiceDiscovery) AddGateways(gws ...model.NetworkGateway) {
318
	sd.networkGateways = append(sd.networkGateways, gws...)
319
	sd.NotifyGatewayHandlers()
320
}
321

322
func (sd *ServiceDiscovery) NetworkGateways() []model.NetworkGateway {
323
	return sd.networkGateways
324
}
325

326
func (sd *ServiceDiscovery) MCSServices() []model.MCSServiceInfo {
327
	return nil
328
}
329

330
// Memory does not support workload handlers; everything is done in terms of instances
331
func (sd *ServiceDiscovery) AppendWorkloadHandler(func(*model.WorkloadInstance, model.Event)) {}
332

333
// AppendServiceHandler appends a service handler to the controller
334
func (sd *ServiceDiscovery) AppendServiceHandler(f model.ServiceHandler) {
335
	sd.handlers.AppendServiceHandler(f)
336
}
337

338
// Run will run the controller
339
func (sd *ServiceDiscovery) Run(<-chan struct{}) {}
340

341
// HasSynced always returns true
342
func (sd *ServiceDiscovery) HasSynced() bool { return true }
343

344
func (sd *ServiceDiscovery) AddressInformation(requests sets.String) ([]model.AddressInfo, sets.String) {
345
	sd.mutex.Lock()
346
	defer sd.mutex.Unlock()
347
	if len(requests) == 0 {
348
		return maps.Values(sd.addresses), nil
349
	}
350

351
	var infos []model.AddressInfo
352
	removed := sets.String{}
353
	for req := range requests {
354
		if _, found := sd.addresses[req]; !found {
355
			removed.Insert(req)
356
		} else {
357
			infos = append(infos, sd.addresses[req])
358
		}
359
	}
360
	return infos, removed
361
}
362

363
func (sd *ServiceDiscovery) AdditionalPodSubscriptions(
364
	*model.Proxy,
365
	sets.String,
366
	sets.String,
367
) sets.String {
368
	return nil
369
}
370

371
func (sd *ServiceDiscovery) Policies(sets.Set[model.ConfigKey]) []model.WorkloadAuthorization {
372
	return nil
373
}
374

375
func (sd *ServiceDiscovery) ServicesForWaypoint(model.WaypointKey) []model.ServiceInfo {
376
	return nil
377
}
378

379
func (sd *ServiceDiscovery) Waypoint(string, string) []netip.Addr {
380
	return nil
381
}
382

383
func (sd *ServiceDiscovery) WorkloadsForWaypoint(model.WaypointKey) []model.WorkloadInfo {
384
	return nil
385
}
386

387
func (sd *ServiceDiscovery) AddWorkloadInfo(infos ...*model.WorkloadInfo) {
388
	sd.mutex.Lock()
389
	defer sd.mutex.Unlock()
390
	for _, info := range infos {
391
		sd.addresses[info.ResourceName()] = workloadToAddressInfo(info.Workload)
392
	}
393
}
394

395
func (sd *ServiceDiscovery) RemoveWorkloadInfo(info *model.WorkloadInfo) {
396
	sd.mutex.Lock()
397
	defer sd.mutex.Unlock()
398
	delete(sd.addresses, info.ResourceName())
399
}
400

401
func (sd *ServiceDiscovery) AddServiceInfo(infos ...*model.ServiceInfo) {
402
	sd.mutex.Lock()
403
	defer sd.mutex.Unlock()
404
	for _, info := range infos {
405
		sd.addresses[info.ResourceName()] = serviceToAddressInfo(info.Service)
406
	}
407
}
408

409
func (sd *ServiceDiscovery) RemoveServiceInfo(info *model.ServiceInfo) {
410
	sd.mutex.Lock()
411
	defer sd.mutex.Unlock()
412
	delete(sd.addresses, info.ResourceName())
413
}
414

415
func workloadToAddressInfo(w *workloadapi.Workload) model.AddressInfo {
416
	return model.AddressInfo{
417
		Address: &workloadapi.Address{
418
			Type: &workloadapi.Address_Workload{
419
				Workload: w,
420
			},
421
		},
422
	}
423
}
424

425
func serviceToAddressInfo(s *workloadapi.Service) model.AddressInfo {
426
	return model.AddressInfo{
427
		Address: &workloadapi.Address{
428
			Type: &workloadapi.Address_Service{
429
				Service: s,
430
			},
431
		},
432
	}
433
}
434

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.