istio

Форк
0
342 строки · 10.1 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
	"sync"
19

20
	v1 "k8s.io/api/core/v1"
21
	"k8s.io/apimachinery/pkg/types"
22

23
	"istio.io/istio/pilot/pkg/model"
24
	"istio.io/istio/pkg/config"
25
	"istio.io/istio/pkg/kube/kclient"
26
	"istio.io/istio/pkg/maps"
27
	"istio.io/istio/pkg/util/sets"
28
)
29

30
// PodCache is an eventually consistent pod cache
31
type PodCache struct {
32
	pods kclient.Client[*v1.Pod]
33

34
	sync.RWMutex
35
	// podsByIP maintains stable pod IP to name key mapping
36
	// this allows us to retrieve the latest status by pod IP.
37
	// This should only contain RUNNING or PENDING pods with an allocated IP.
38
	podsByIP map[string]sets.Set[types.NamespacedName]
39
	// IPByPods is a reverse map of podsByIP. This exists to allow us to prune stale entries in the
40
	// pod cache if a pod changes IP.
41
	IPByPods map[types.NamespacedName]string
42

43
	// needResync is map of IP to endpoint namespace/name. This is used to requeue endpoint
44
	// events when pod event comes. This typically happens when pod is not available
45
	// in podCache when endpoint event comes.
46
	needResync         map[string]sets.Set[types.NamespacedName]
47
	queueEndpointEvent func(types.NamespacedName)
48

49
	c *Controller
50
}
51

52
func newPodCache(c *Controller, pods kclient.Client[*v1.Pod], queueEndpointEvent func(types.NamespacedName)) *PodCache {
53
	out := &PodCache{
54
		pods:               pods,
55
		c:                  c,
56
		podsByIP:           make(map[string]sets.Set[types.NamespacedName]),
57
		IPByPods:           make(map[types.NamespacedName]string),
58
		needResync:         make(map[string]sets.Set[types.NamespacedName]),
59
		queueEndpointEvent: queueEndpointEvent,
60
	}
61

62
	return out
63
}
64

65
// Copied from kubernetes/kubernetes/pkg/controller/util/endpoint/controller_utils.go
66
//
67
// shouldPodBeInEndpoints returns true if a specified pod should be in an
68
// Endpoints or EndpointSlice resource. Terminating pods are not included.
69
func shouldPodBeInEndpoints(pod *v1.Pod) bool {
70
	// "Terminal" describes when a Pod is complete (in a succeeded or failed phase).
71
	// This is distinct from the "Terminating" condition which represents when a Pod
72
	// is being terminated (metadata.deletionTimestamp is non nil).
73
	if isPodPhaseTerminal(pod.Status.Phase) {
74
		return false
75
	}
76

77
	if len(pod.Status.PodIP) == 0 && len(pod.Status.PodIPs) == 0 {
78
		return false
79
	}
80

81
	if pod.DeletionTimestamp != nil {
82
		return false
83
	}
84

85
	return true
86
}
87

88
// isPodPhaseTerminal returns true if the pod's phase is terminal.
89
func isPodPhaseTerminal(phase v1.PodPhase) bool {
90
	return phase == v1.PodFailed || phase == v1.PodSucceeded
91
}
92

93
func IsPodRunning(pod *v1.Pod) bool {
94
	return pod.Status.Phase == v1.PodRunning
95
}
96

97
// IsPodReady is copied from kubernetes/pkg/api/v1/pod/utils.go
98
func IsPodReady(pod *v1.Pod) bool {
99
	return IsPodReadyConditionTrue(pod.Status)
100
}
101

102
// IsPodReadyConditionTrue returns true if a pod is ready; false otherwise.
103
func IsPodReadyConditionTrue(status v1.PodStatus) bool {
104
	condition := GetPodReadyCondition(status)
105
	return condition != nil && condition.Status == v1.ConditionTrue
106
}
107

108
func GetPodReadyCondition(status v1.PodStatus) *v1.PodCondition {
109
	_, condition := GetPodCondition(&status, v1.PodReady)
110
	return condition
111
}
112

113
func GetPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
114
	if status == nil {
115
		return -1, nil
116
	}
117
	return GetPodConditionFromList(status.Conditions, conditionType)
118
}
119

120
// GetPodConditionFromList extracts the provided condition from the given list of condition and
121
// returns the index of the condition and the condition. Returns -1 and nil if the condition is not present.
122
func GetPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
123
	if conditions == nil {
124
		return -1, nil
125
	}
126
	for i := range conditions {
127
		if conditions[i].Type == conditionType {
128
			return i, &conditions[i]
129
		}
130
	}
131
	return -1, nil
132
}
133

134
func (pc *PodCache) labelFilter(old, cur *v1.Pod) bool {
135
	// If labels updated, trigger proxy push
136
	if cur.Status.PodIP != "" && !maps.Equal(old.Labels, cur.Labels) {
137
		pc.proxyUpdates(cur.Status.PodIP)
138
	}
139

140
	// always continue calling pc.onEvent
141
	return false
142
}
143

144
// onEvent updates the IP-based index (pc.podsByIP).
145
func (pc *PodCache) onEvent(_, pod *v1.Pod, ev model.Event) error {
146
	ip := pod.Status.PodIP
147
	// PodIP will be empty when pod is just created, but before the IP is assigned
148
	// via UpdateStatus.
149
	if len(ip) == 0 {
150
		return nil
151
	}
152

153
	key := config.NamespacedName(pod)
154
	switch ev {
155
	case model.EventAdd:
156
		if shouldPodBeInEndpoints(pod) && IsPodReady(pod) {
157
			pc.update(ip, key)
158
		} else {
159
			return nil
160
		}
161
	case model.EventUpdate:
162
		if !shouldPodBeInEndpoints(pod) || !IsPodReady(pod) {
163
			// delete only if this pod was in the cache
164
			if !pc.deleteIP(ip, key) {
165
				return nil
166
			}
167
			ev = model.EventDelete
168
		} else if shouldPodBeInEndpoints(pod) && IsPodReady(pod) {
169
			pc.update(ip, key)
170
		} else {
171
			return nil
172
		}
173
	case model.EventDelete:
174
		// delete only if this pod was in the cache,
175
		// in most case it has already been deleted in `UPDATE` with `DeletionTimestamp` set.
176
		if !pc.deleteIP(ip, key) {
177
			return nil
178
		}
179
	}
180
	pc.notifyWorkloadHandlers(pod, ev)
181
	return nil
182
}
183

184
// notifyWorkloadHandlers fire workloadInstance handlers for pod
185
func (pc *PodCache) notifyWorkloadHandlers(pod *v1.Pod, ev model.Event) {
186
	// if no workload handler registered, skip building WorkloadInstance
187
	if len(pc.c.handlers.GetWorkloadHandlers()) == 0 {
188
		return
189
	}
190
	// fire instance handles for workload
191
	ep := NewEndpointBuilder(pc.c, pod).buildIstioEndpoint(pod.Status.PodIP, 0, "", model.AlwaysDiscoverable, model.Healthy)
192
	workloadInstance := &model.WorkloadInstance{
193
		Name:      pod.Name,
194
		Namespace: pod.Namespace,
195
		Kind:      model.PodKind,
196
		Endpoint:  ep,
197
		PortMap:   getPortMap(pod),
198
	}
199
	pc.c.handlers.NotifyWorkloadHandlers(workloadInstance, ev)
200
}
201

202
func getPortMap(pod *v1.Pod) map[string]uint32 {
203
	pmap := map[string]uint32{}
204
	for _, c := range pod.Spec.Containers {
205
		for _, port := range c.Ports {
206
			if port.Name == "" || port.Protocol != v1.ProtocolTCP {
207
				continue
208
			}
209
			// First port wins, per Kubernetes (https://github.com/kubernetes/kubernetes/issues/54213)
210
			if _, f := pmap[port.Name]; !f {
211
				pmap[port.Name] = uint32(port.ContainerPort)
212
			}
213
		}
214
	}
215
	return pmap
216
}
217

218
// deleteIP returns true if the pod and ip are really deleted.
219
func (pc *PodCache) deleteIP(ip string, podKey types.NamespacedName) bool {
220
	pc.Lock()
221
	defer pc.Unlock()
222
	if pc.podsByIP[ip].Contains(podKey) {
223
		sets.DeleteCleanupLast(pc.podsByIP, ip, podKey)
224
		delete(pc.IPByPods, podKey)
225
		return true
226
	}
227
	return false
228
}
229

230
func (pc *PodCache) update(ip string, key types.NamespacedName) {
231
	pc.Lock()
232
	// if the pod has been cached, return
233
	if pc.podsByIP[ip].Contains(key) {
234
		pc.Unlock()
235
		return
236
	}
237
	if current, f := pc.IPByPods[key]; f {
238
		// The pod already exists, but with another IP Address. We need to clean up that
239
		sets.DeleteCleanupLast(pc.podsByIP, current, key)
240
	}
241
	sets.InsertOrNew(pc.podsByIP, ip, key)
242
	pc.IPByPods[key] = ip
243

244
	if endpointsToUpdate, f := pc.needResync[ip]; f {
245
		delete(pc.needResync, ip)
246
		for epKey := range endpointsToUpdate {
247
			pc.queueEndpointEvent(epKey)
248
		}
249
		endpointsPendingPodUpdate.Record(float64(len(pc.needResync)))
250
	}
251
	pc.Unlock()
252

253
	pc.proxyUpdates(ip)
254
}
255

256
// queueEndpointEventOnPodArrival registers this endpoint and queues endpoint event
257
// when the corresponding pod arrives.
258
func (pc *PodCache) queueEndpointEventOnPodArrival(key types.NamespacedName, ip string) {
259
	pc.Lock()
260
	defer pc.Unlock()
261
	sets.InsertOrNew(pc.needResync, ip, key)
262
	endpointsPendingPodUpdate.Record(float64(len(pc.needResync)))
263
}
264

265
// endpointDeleted cleans up endpoint from resync endpoint list.
266
func (pc *PodCache) endpointDeleted(key types.NamespacedName, ip string) {
267
	pc.Lock()
268
	defer pc.Unlock()
269
	sets.DeleteCleanupLast(pc.needResync, ip, key)
270
	endpointsPendingPodUpdate.Record(float64(len(pc.needResync)))
271
}
272

273
func (pc *PodCache) proxyUpdates(ip string) {
274
	if pc.c != nil && pc.c.opts.XDSUpdater != nil {
275
		pc.c.opts.XDSUpdater.ProxyUpdate(pc.c.Cluster(), ip)
276
	}
277
}
278

279
func (pc *PodCache) getPodKeys(addr string) []types.NamespacedName {
280
	pc.RLock()
281
	defer pc.RUnlock()
282
	return pc.podsByIP[addr].UnsortedList()
283
}
284

285
// getPodByIp returns the pod or nil if pod not found or an error occurred
286
func (pc *PodCache) getPodsByIP(addr string) []*v1.Pod {
287
	keys := pc.getPodKeys(addr)
288
	if keys == nil {
289
		return nil
290
	}
291
	res := make([]*v1.Pod, 0, len(keys))
292
	for _, key := range keys {
293
		p := pc.getPodByKey(key)
294
		// Subtle race condition. getPodKeys is our cache over pods, while getPodByKey hits the informer cache.
295
		// if these are out of sync, p may be nil (pod was deleted).
296
		if p != nil {
297
			res = append(res, p)
298
		}
299
	}
300
	return res
301
}
302

303
// getPodByKey returns the pod by key
304
func (pc *PodCache) getPodByKey(key types.NamespacedName) *v1.Pod {
305
	return pc.pods.Get(key.Name, key.Namespace)
306
}
307

308
// getPodByKey returns the pod of the proxy
309
func (pc *PodCache) getPodByProxy(proxy *model.Proxy) *v1.Pod {
310
	var pod *v1.Pod
311
	key := podKeyByProxy(proxy)
312
	if key.Name != "" {
313
		pod = pc.getPodByKey(key)
314
		if pod != nil {
315
			return pod
316
		}
317
	}
318

319
	// only need to fetch the corresponding pod through the first IP, although there are multiple IP scenarios,
320
	// because multiple ips belong to the same pod
321
	proxyIP := proxy.IPAddresses[0]
322
	// just in case the proxy ID is bad formatted
323
	pods := pc.getPodsByIP(proxyIP)
324
	switch len(pods) {
325
	case 0:
326
		return nil
327
	case 1:
328
		return pods[0]
329
	default:
330
		// This should only happen with hostNetwork pods, which cannot be proxy clients...
331
		log.Errorf("unexpected: found multiple pods for proxy %v (%v)", proxy.ID, proxyIP)
332
		// Try to handle it gracefully
333
		for _, p := range pods {
334
			// At least filter out wrong namespaces...
335
			if proxy.ConfigNamespace != p.Namespace {
336
				continue
337
			}
338
			return p
339
		}
340
		return nil
341
	}
342
}
343

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.