istio
342 строки · 10.1 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package controller
16
17import (
18"sync"
19
20v1 "k8s.io/api/core/v1"
21"k8s.io/apimachinery/pkg/types"
22
23"istio.io/istio/pilot/pkg/model"
24"istio.io/istio/pkg/config"
25"istio.io/istio/pkg/kube/kclient"
26"istio.io/istio/pkg/maps"
27"istio.io/istio/pkg/util/sets"
28)
29
30// PodCache is an eventually consistent pod cache
31type PodCache struct {
32pods kclient.Client[*v1.Pod]
33
34sync.RWMutex
35// podsByIP maintains stable pod IP to name key mapping
36// this allows us to retrieve the latest status by pod IP.
37// This should only contain RUNNING or PENDING pods with an allocated IP.
38podsByIP map[string]sets.Set[types.NamespacedName]
39// IPByPods is a reverse map of podsByIP. This exists to allow us to prune stale entries in the
40// pod cache if a pod changes IP.
41IPByPods map[types.NamespacedName]string
42
43// needResync is map of IP to endpoint namespace/name. This is used to requeue endpoint
44// events when pod event comes. This typically happens when pod is not available
45// in podCache when endpoint event comes.
46needResync map[string]sets.Set[types.NamespacedName]
47queueEndpointEvent func(types.NamespacedName)
48
49c *Controller
50}
51
52func newPodCache(c *Controller, pods kclient.Client[*v1.Pod], queueEndpointEvent func(types.NamespacedName)) *PodCache {
53out := &PodCache{
54pods: pods,
55c: c,
56podsByIP: make(map[string]sets.Set[types.NamespacedName]),
57IPByPods: make(map[types.NamespacedName]string),
58needResync: make(map[string]sets.Set[types.NamespacedName]),
59queueEndpointEvent: queueEndpointEvent,
60}
61
62return out
63}
64
65// Copied from kubernetes/kubernetes/pkg/controller/util/endpoint/controller_utils.go
66//
67// shouldPodBeInEndpoints returns true if a specified pod should be in an
68// Endpoints or EndpointSlice resource. Terminating pods are not included.
69func shouldPodBeInEndpoints(pod *v1.Pod) bool {
70// "Terminal" describes when a Pod is complete (in a succeeded or failed phase).
71// This is distinct from the "Terminating" condition which represents when a Pod
72// is being terminated (metadata.deletionTimestamp is non nil).
73if isPodPhaseTerminal(pod.Status.Phase) {
74return false
75}
76
77if len(pod.Status.PodIP) == 0 && len(pod.Status.PodIPs) == 0 {
78return false
79}
80
81if pod.DeletionTimestamp != nil {
82return false
83}
84
85return true
86}
87
88// isPodPhaseTerminal returns true if the pod's phase is terminal.
89func isPodPhaseTerminal(phase v1.PodPhase) bool {
90return phase == v1.PodFailed || phase == v1.PodSucceeded
91}
92
93func IsPodRunning(pod *v1.Pod) bool {
94return pod.Status.Phase == v1.PodRunning
95}
96
97// IsPodReady is copied from kubernetes/pkg/api/v1/pod/utils.go
98func IsPodReady(pod *v1.Pod) bool {
99return IsPodReadyConditionTrue(pod.Status)
100}
101
102// IsPodReadyConditionTrue returns true if a pod is ready; false otherwise.
103func IsPodReadyConditionTrue(status v1.PodStatus) bool {
104condition := GetPodReadyCondition(status)
105return condition != nil && condition.Status == v1.ConditionTrue
106}
107
108func GetPodReadyCondition(status v1.PodStatus) *v1.PodCondition {
109_, condition := GetPodCondition(&status, v1.PodReady)
110return condition
111}
112
113func GetPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
114if status == nil {
115return -1, nil
116}
117return GetPodConditionFromList(status.Conditions, conditionType)
118}
119
120// GetPodConditionFromList extracts the provided condition from the given list of condition and
121// returns the index of the condition and the condition. Returns -1 and nil if the condition is not present.
122func GetPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
123if conditions == nil {
124return -1, nil
125}
126for i := range conditions {
127if conditions[i].Type == conditionType {
128return i, &conditions[i]
129}
130}
131return -1, nil
132}
133
134func (pc *PodCache) labelFilter(old, cur *v1.Pod) bool {
135// If labels updated, trigger proxy push
136if cur.Status.PodIP != "" && !maps.Equal(old.Labels, cur.Labels) {
137pc.proxyUpdates(cur.Status.PodIP)
138}
139
140// always continue calling pc.onEvent
141return false
142}
143
144// onEvent updates the IP-based index (pc.podsByIP).
145func (pc *PodCache) onEvent(_, pod *v1.Pod, ev model.Event) error {
146ip := pod.Status.PodIP
147// PodIP will be empty when pod is just created, but before the IP is assigned
148// via UpdateStatus.
149if len(ip) == 0 {
150return nil
151}
152
153key := config.NamespacedName(pod)
154switch ev {
155case model.EventAdd:
156if shouldPodBeInEndpoints(pod) && IsPodReady(pod) {
157pc.update(ip, key)
158} else {
159return nil
160}
161case model.EventUpdate:
162if !shouldPodBeInEndpoints(pod) || !IsPodReady(pod) {
163// delete only if this pod was in the cache
164if !pc.deleteIP(ip, key) {
165return nil
166}
167ev = model.EventDelete
168} else if shouldPodBeInEndpoints(pod) && IsPodReady(pod) {
169pc.update(ip, key)
170} else {
171return nil
172}
173case model.EventDelete:
174// delete only if this pod was in the cache,
175// in most case it has already been deleted in `UPDATE` with `DeletionTimestamp` set.
176if !pc.deleteIP(ip, key) {
177return nil
178}
179}
180pc.notifyWorkloadHandlers(pod, ev)
181return nil
182}
183
184// notifyWorkloadHandlers fire workloadInstance handlers for pod
185func (pc *PodCache) notifyWorkloadHandlers(pod *v1.Pod, ev model.Event) {
186// if no workload handler registered, skip building WorkloadInstance
187if len(pc.c.handlers.GetWorkloadHandlers()) == 0 {
188return
189}
190// fire instance handles for workload
191ep := NewEndpointBuilder(pc.c, pod).buildIstioEndpoint(pod.Status.PodIP, 0, "", model.AlwaysDiscoverable, model.Healthy)
192workloadInstance := &model.WorkloadInstance{
193Name: pod.Name,
194Namespace: pod.Namespace,
195Kind: model.PodKind,
196Endpoint: ep,
197PortMap: getPortMap(pod),
198}
199pc.c.handlers.NotifyWorkloadHandlers(workloadInstance, ev)
200}
201
202func getPortMap(pod *v1.Pod) map[string]uint32 {
203pmap := map[string]uint32{}
204for _, c := range pod.Spec.Containers {
205for _, port := range c.Ports {
206if port.Name == "" || port.Protocol != v1.ProtocolTCP {
207continue
208}
209// First port wins, per Kubernetes (https://github.com/kubernetes/kubernetes/issues/54213)
210if _, f := pmap[port.Name]; !f {
211pmap[port.Name] = uint32(port.ContainerPort)
212}
213}
214}
215return pmap
216}
217
218// deleteIP returns true if the pod and ip are really deleted.
219func (pc *PodCache) deleteIP(ip string, podKey types.NamespacedName) bool {
220pc.Lock()
221defer pc.Unlock()
222if pc.podsByIP[ip].Contains(podKey) {
223sets.DeleteCleanupLast(pc.podsByIP, ip, podKey)
224delete(pc.IPByPods, podKey)
225return true
226}
227return false
228}
229
230func (pc *PodCache) update(ip string, key types.NamespacedName) {
231pc.Lock()
232// if the pod has been cached, return
233if pc.podsByIP[ip].Contains(key) {
234pc.Unlock()
235return
236}
237if current, f := pc.IPByPods[key]; f {
238// The pod already exists, but with another IP Address. We need to clean up that
239sets.DeleteCleanupLast(pc.podsByIP, current, key)
240}
241sets.InsertOrNew(pc.podsByIP, ip, key)
242pc.IPByPods[key] = ip
243
244if endpointsToUpdate, f := pc.needResync[ip]; f {
245delete(pc.needResync, ip)
246for epKey := range endpointsToUpdate {
247pc.queueEndpointEvent(epKey)
248}
249endpointsPendingPodUpdate.Record(float64(len(pc.needResync)))
250}
251pc.Unlock()
252
253pc.proxyUpdates(ip)
254}
255
256// queueEndpointEventOnPodArrival registers this endpoint and queues endpoint event
257// when the corresponding pod arrives.
258func (pc *PodCache) queueEndpointEventOnPodArrival(key types.NamespacedName, ip string) {
259pc.Lock()
260defer pc.Unlock()
261sets.InsertOrNew(pc.needResync, ip, key)
262endpointsPendingPodUpdate.Record(float64(len(pc.needResync)))
263}
264
265// endpointDeleted cleans up endpoint from resync endpoint list.
266func (pc *PodCache) endpointDeleted(key types.NamespacedName, ip string) {
267pc.Lock()
268defer pc.Unlock()
269sets.DeleteCleanupLast(pc.needResync, ip, key)
270endpointsPendingPodUpdate.Record(float64(len(pc.needResync)))
271}
272
273func (pc *PodCache) proxyUpdates(ip string) {
274if pc.c != nil && pc.c.opts.XDSUpdater != nil {
275pc.c.opts.XDSUpdater.ProxyUpdate(pc.c.Cluster(), ip)
276}
277}
278
279func (pc *PodCache) getPodKeys(addr string) []types.NamespacedName {
280pc.RLock()
281defer pc.RUnlock()
282return pc.podsByIP[addr].UnsortedList()
283}
284
285// getPodByIp returns the pod or nil if pod not found or an error occurred
286func (pc *PodCache) getPodsByIP(addr string) []*v1.Pod {
287keys := pc.getPodKeys(addr)
288if keys == nil {
289return nil
290}
291res := make([]*v1.Pod, 0, len(keys))
292for _, key := range keys {
293p := pc.getPodByKey(key)
294// Subtle race condition. getPodKeys is our cache over pods, while getPodByKey hits the informer cache.
295// if these are out of sync, p may be nil (pod was deleted).
296if p != nil {
297res = append(res, p)
298}
299}
300return res
301}
302
303// getPodByKey returns the pod by key
304func (pc *PodCache) getPodByKey(key types.NamespacedName) *v1.Pod {
305return pc.pods.Get(key.Name, key.Namespace)
306}
307
308// getPodByKey returns the pod of the proxy
309func (pc *PodCache) getPodByProxy(proxy *model.Proxy) *v1.Pod {
310var pod *v1.Pod
311key := podKeyByProxy(proxy)
312if key.Name != "" {
313pod = pc.getPodByKey(key)
314if pod != nil {
315return pod
316}
317}
318
319// only need to fetch the corresponding pod through the first IP, although there are multiple IP scenarios,
320// because multiple ips belong to the same pod
321proxyIP := proxy.IPAddresses[0]
322// just in case the proxy ID is bad formatted
323pods := pc.getPodsByIP(proxyIP)
324switch len(pods) {
325case 0:
326return nil
327case 1:
328return pods[0]
329default:
330// This should only happen with hostNetwork pods, which cannot be proxy clients...
331log.Errorf("unexpected: found multiple pods for proxy %v (%v)", proxy.ID, proxyIP)
332// Try to handle it gracefully
333for _, p := range pods {
334// At least filter out wrong namespaces...
335if proxy.ConfigNamespace != p.Namespace {
336continue
337}
338return p
339}
340return nil
341}
342}
343