inspektor-gadget
537 строк · 16.5 Кб
1// Copyright 2019-2023 The Inspektor Gadget authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package containercollection provides the ContainerCollection struct to keep
16// track of the set of running containers and primitives to query that set with
17// various criteria.
18//
19// It is used by the Gadget Tracer Manager to keep track of containers part of
20// Kubernetes pods and by IG Manager to keep track of containers on a
21// Linux system.
22package containercollection
23
24import (
25"sync"
26"time"
27
28log "github.com/sirupsen/logrus"
29metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30
31eventtypes "github.com/inspektor-gadget/inspektor-gadget/pkg/types"
32)
33
34// ContainerCollection holds a set of containers. It can be embedded as an
35// anonymous struct to help other structs implement the ContainerResolver
36// interface. For this reason, some methods are namespaced with 'Container' to
37// make this clear.
38type ContainerCollection struct {
39mu sync.Mutex
40
41// Keys: containerID string
42// Values: container Container
43containers sync.Map
44
45// Keys: MntNsID string
46// Values: container Container
47containersByMntNs sync.Map
48
49// Keys: NetNsID string
50// Values: container Container
51containersByNetNs sync.Map
52
53// Saves containers for "cacheDelay" to be able to enrich events after the container is
54// removed. This is enabled by using WithTracerCollection().
55cachedContainers *sync.Map
56cacheDelay time.Duration
57
58// subs contains a list of subscribers of container events
59pubsub *GadgetPubSub
60
61// containerEnrichers are functions that automatically add metadata
62// upon AddContainer. The functions return true on success or false if
63// the container is meant to be dropped.
64containerEnrichers []func(container *Container) (ok bool)
65
66// initialContainers is used during the initialization process to
67// gather initial containers and then call the enrichers
68initialContainers []*Container
69
70// nodeName is used by the Enrich() function
71nodeName string
72
73// initialized tells if Initialize() has been called.
74initialized bool
75
76// closed tells if Close() has been called.
77closed bool
78done chan struct{}
79
80// functions to be called on Close()
81cleanUpFuncs []func()
82
83// disableContainerRuntimeWarnings is used to disable warnings about container runtimes.
84disableContainerRuntimeWarnings bool
85}
86
87// ContainerCollectionOption are options to pass to
88// Initialize using the functional option code pattern.
89type ContainerCollectionOption func(*ContainerCollection) error
90
91// Initialize initializes a ContainerCollection. It is
92// useful when ContainerCollection is embedded as an anonymous struct because
93// we don't use a contructor in that case.
94func (cc *ContainerCollection) Initialize(options ...ContainerCollectionOption) error {
95cc.done = make(chan struct{})
96
97if cc.initialized {
98panic("Initialize already called")
99}
100
101// Call functional options. This might fetch initial containers.
102for _, o := range options {
103err := o(cc)
104if err != nil {
105return err
106}
107}
108
109// Consume initial containers that might have been fetched by
110// functional options. This is done after all functional options have
111// been called, so that cc.containerEnrichers is fully set up.
112for _, container := range cc.initialContainers {
113cc.AddContainer(container)
114}
115cc.initialContainers = nil
116
117cc.initialized = true
118return nil
119}
120
121// GetContainer looks up a container by the container id and return it if
122// found, or return nil if not found.
123func (cc *ContainerCollection) GetContainer(id string) *Container {
124v, ok := cc.containers.Load(id)
125if !ok {
126return nil
127}
128container := v.(*Container)
129return container
130}
131
132// RemoveContainer removes a container from the collection, but only after
133// notifying all the subscribers.
134func (cc *ContainerCollection) RemoveContainer(id string) {
135v, loaded := cc.containers.Load(id)
136if !loaded {
137return
138}
139
140container := v.(*Container)
141
142if cc.pubsub != nil {
143cc.pubsub.Publish(EventTypeRemoveContainer, container)
144}
145
146// Save the container in the cache as enrichers might need the container some time after it
147// has been removed.
148if cc.cachedContainers != nil {
149container.deletionTimestamp = time.Now()
150cc.cachedContainers.Store(id, v)
151}
152
153// Remove the container from the collection after publishing the event as
154// subscribers might need to use the different collection's lookups during
155// the notification handler, and they expect the container to still be
156// present.
157cc.containers.Delete(id)
158
159// Make this operation atomic, as RemoveContainer() could be called concurrently, which could result in
160// dirty map contents
161cc.mu.Lock()
162defer cc.mu.Unlock()
163
164// Remove from MntNs lookup
165mntNsContainer, ok := cc.containersByMntNs.Load(container.Mntns)
166if !ok || mntNsContainer.(*Container).Runtime.ContainerID != container.Runtime.ContainerID {
167log.Warn("container not found or mismatch in mntns lookup map")
168return
169} else {
170cc.containersByMntNs.Delete(container.Mntns)
171}
172
173// Remove from NetNs lookup; arrays should be immutable, so recreate them
174netNsContainers, ok := cc.containersByNetNs.Load(container.Netns)
175if !ok {
176log.Warn("container netns not found in netns lookup map")
177return
178}
179
180found := false
181netNsContainersArr := netNsContainers.([]*Container)
182newNetNsContainers := make([]*Container, 0, len(netNsContainersArr)-1)
183for _, netNsContainer := range netNsContainersArr {
184if netNsContainer.Runtime.ContainerID == container.Runtime.ContainerID {
185found = true
186continue
187}
188newNetNsContainers = append(newNetNsContainers, netNsContainer)
189}
190if !found {
191log.Warn("container not found in netns lookup array")
192}
193
194if len(newNetNsContainers) > 0 {
195cc.containersByNetNs.Store(container.Netns, newNetNsContainers)
196} else {
197// clean up empty entries
198cc.containersByNetNs.Delete(container.Netns)
199}
200}
201
202// AddContainer adds a container to the collection.
203func (cc *ContainerCollection) AddContainer(container *Container) {
204for _, enricher := range cc.containerEnrichers {
205ok := enricher(container)
206// Enrichers can decide to drop a container
207if !ok {
208container.close()
209return
210}
211}
212
213_, loaded := cc.containers.LoadOrStore(container.Runtime.ContainerID, container)
214if loaded {
215return
216}
217cc.mu.Lock()
218cc.containersByMntNs.Store(container.Mntns, container)
219arr, ok := cc.containersByNetNs.Load(container.Netns)
220var newContainerArr []*Container
221if ok {
222newContainerArr = append(newContainerArr, arr.([]*Container)...)
223}
224newContainerArr = append(newContainerArr, container)
225cc.containersByNetNs.Store(container.Netns, newContainerArr)
226cc.mu.Unlock()
227
228if cc.pubsub != nil {
229cc.pubsub.Publish(EventTypeAddContainer, container)
230}
231}
232
233// LookupMntnsByContainer returns the mount namespace inode of the container
234// specified in arguments or zero if not found
235func (cc *ContainerCollection) LookupMntnsByContainer(namespace, pod, container string) (mntns uint64) {
236cc.containers.Range(func(key, value interface{}) bool {
237c := value.(*Container)
238if namespace == c.K8s.Namespace && pod == c.K8s.PodName && container == c.K8s.ContainerName {
239mntns = c.Mntns
240// container found, stop iterating
241return false
242}
243return true
244})
245return
246}
247
248func lookupContainerByMntns(m *sync.Map, mntnsid uint64) *Container {
249var container *Container
250
251m.Range(func(key, value interface{}) bool {
252c := value.(*Container)
253if c.Mntns == mntnsid {
254container = c
255// container found, stop iterating
256return false
257}
258return true
259})
260return container
261}
262
263// LookupContainerByMntns returns a container by its mount namespace
264// inode id. If not found nil is returned.
265func (cc *ContainerCollection) LookupContainerByMntns(mntnsid uint64) *Container {
266container, ok := cc.containersByMntNs.Load(mntnsid)
267if !ok {
268return nil
269}
270return container.(*Container)
271}
272
273// LookupContainersByNetns returns a slice of containers that run in a given
274// network namespace. Or an empty slice if there are no containers running in
275// that network namespace.
276func (cc *ContainerCollection) LookupContainersByNetns(netnsid uint64) []*Container {
277containers, ok := cc.containersByNetNs.Load(netnsid)
278if !ok {
279return nil
280}
281return containers.([]*Container)
282}
283
284func lookupContainersByNetns(m *sync.Map, netnsid uint64) (containers []*Container) {
285m.Range(func(key, value interface{}) bool {
286c := value.(*Container)
287if c.Netns == netnsid {
288containers = append(containers, c)
289}
290return true
291})
292return containers
293}
294
295// LookupMntnsByPod returns the mount namespace inodes of all containers
296// belonging to the pod specified in arguments, indexed by the name of the
297// containers or an empty map if not found
298func (cc *ContainerCollection) LookupMntnsByPod(namespace, pod string) map[string]uint64 {
299ret := make(map[string]uint64)
300cc.containers.Range(func(key, value interface{}) bool {
301c := value.(*Container)
302if namespace == c.K8s.Namespace && pod == c.K8s.PodName {
303ret[c.K8s.ContainerName] = c.Mntns
304}
305return true
306})
307return ret
308}
309
310// LookupPIDByContainer returns the PID of the container
311// specified in arguments or zero if not found
312func (cc *ContainerCollection) LookupPIDByContainer(namespace, pod, container string) (pid uint32) {
313cc.containers.Range(func(key, value interface{}) bool {
314c := value.(*Container)
315if namespace == c.K8s.Namespace && pod == c.K8s.PodName && container == c.K8s.ContainerName {
316pid = c.Pid
317// container found, stop iterating
318return false
319}
320return true
321})
322return
323}
324
325// LookupPIDByPod returns the PID of all containers belonging to
326// the pod specified in arguments, indexed by the name of the
327// containers or an empty map if not found
328func (cc *ContainerCollection) LookupPIDByPod(namespace, pod string) map[string]uint32 {
329ret := make(map[string]uint32)
330cc.containers.Range(func(key, value interface{}) bool {
331c := value.(*Container)
332if namespace == c.K8s.Namespace && pod == c.K8s.PodName {
333ret[c.K8s.ContainerName] = c.Pid
334}
335return true
336})
337return ret
338}
339
340// LookupOwnerReferenceByMntns returns a pointer to the owner reference of the
341// container identified by the mount namespace, or nil if not found
342func (cc *ContainerCollection) LookupOwnerReferenceByMntns(mntns uint64) *metav1.OwnerReference {
343var ownerRef *metav1.OwnerReference
344var err error
345cc.containers.Range(func(key, value interface{}) bool {
346c := value.(*Container)
347if mntns == c.Mntns {
348ownerRef, err = c.GetOwnerReference()
349if err != nil {
350log.Warnf("Failed to get owner reference of %s/%s/%s: %s",
351c.K8s.Namespace, c.K8s.PodName, c.K8s.ContainerName, err)
352}
353// container found, stop iterating
354return false
355}
356return true
357})
358return ownerRef
359}
360
361// GetContainersBySelector returns a slice of containers that match
362// the selector or an empty slice if there are not matches
363func (cc *ContainerCollection) GetContainersBySelector(
364containerSelector *ContainerSelector,
365) []*Container {
366selectedContainers := []*Container{}
367cc.containers.Range(func(key, value interface{}) bool {
368c := value.(*Container)
369if ContainerSelectorMatches(containerSelector, c) {
370selectedContainers = append(selectedContainers, c)
371}
372return true
373})
374return selectedContainers
375}
376
377// ContainerLen returns how many containers are stored in the collection.
378func (cc *ContainerCollection) ContainerLen() (count int) {
379cc.containers.Range(func(key, value interface{}) bool {
380count++
381return true
382})
383return
384}
385
386// ContainerRange iterates over the containers of the collection and calls the
387// callback function for each of them.
388func (cc *ContainerCollection) ContainerRange(f func(*Container)) {
389cc.containers.Range(func(key, value interface{}) bool {
390c := value.(*Container)
391f(c)
392return true
393})
394}
395
396// ContainerRangeWithSelector iterates over the containers of the collection
397// and calls the callback function for each of those that matches the container
398// selector.
399func (cc *ContainerCollection) ContainerRangeWithSelector(
400containerSelector *ContainerSelector,
401f func(*Container),
402) {
403cc.containers.Range(func(key, value interface{}) bool {
404c := value.(*Container)
405if ContainerSelectorMatches(containerSelector, c) {
406f(c)
407}
408return true
409})
410}
411
412func (cc *ContainerCollection) EnrichNode(event *eventtypes.CommonData) {
413event.K8s.Node = cc.nodeName
414}
415
416func (cc *ContainerCollection) EnrichByMntNs(event *eventtypes.CommonData, mountnsid uint64) {
417event.K8s.Node = cc.nodeName
418
419container := cc.LookupContainerByMntns(mountnsid)
420if container == nil && cc.cachedContainers != nil {
421container = lookupContainerByMntns(cc.cachedContainers, mountnsid)
422}
423
424if container != nil {
425event.K8s.ContainerName = container.K8s.ContainerName
426event.K8s.PodName = container.K8s.PodName
427event.K8s.PodLabels = container.K8s.PodLabels
428event.K8s.Namespace = container.K8s.Namespace
429
430event.Runtime.RuntimeName = container.Runtime.RuntimeName
431event.Runtime.ContainerName = container.Runtime.ContainerName
432event.Runtime.ContainerID = container.Runtime.ContainerID
433event.Runtime.ContainerImageName = container.Runtime.ContainerImageName
434event.Runtime.ContainerImageDigest = container.Runtime.ContainerImageDigest
435}
436}
437
438func (cc *ContainerCollection) EnrichByNetNs(event *eventtypes.CommonData, netnsid uint64) {
439event.K8s.Node = cc.nodeName
440
441containers := cc.LookupContainersByNetns(netnsid)
442if len(containers) == 0 && cc.cachedContainers != nil {
443containers = lookupContainersByNetns(cc.cachedContainers, netnsid)
444}
445if len(containers) == 0 {
446return
447}
448if containers[0].HostNetwork {
449event.K8s.HostNetwork = true
450return
451}
452
453if len(containers) == 1 {
454event.K8s.ContainerName = containers[0].K8s.ContainerName
455event.K8s.PodName = containers[0].K8s.PodName
456event.K8s.PodLabels = containers[0].K8s.PodLabels
457event.K8s.Namespace = containers[0].K8s.Namespace
458
459event.Runtime.RuntimeName = containers[0].Runtime.RuntimeName
460event.Runtime.ContainerName = containers[0].Runtime.ContainerName
461event.Runtime.ContainerID = containers[0].Runtime.ContainerID
462event.Runtime.ContainerImageName = containers[0].Runtime.ContainerImageName
463event.Runtime.ContainerImageDigest = containers[0].Runtime.ContainerImageDigest
464return
465}
466if containers[0].K8s.PodName != "" && containers[0].K8s.Namespace != "" {
467// Kubernetes containers within the same pod.
468event.K8s.PodName = containers[0].K8s.PodName
469event.K8s.PodLabels = containers[0].K8s.PodLabels
470event.K8s.Namespace = containers[0].K8s.Namespace
471
472// All containers in the same pod share the same container runtime
473event.Runtime.RuntimeName = containers[0].Runtime.RuntimeName
474}
475// else {
476// TODO: Non-Kubernetes containers sharing the same network namespace.
477// What should we do here?
478// }
479}
480
481// Subscribe returns the list of existing containers and registers a callback
482// for notifications about additions and deletions of containers
483func (cc *ContainerCollection) Subscribe(key interface{}, selector ContainerSelector, f FuncNotify) []*Container {
484if cc.pubsub == nil {
485panic("ContainerCollection's pubsub uninitialized")
486}
487ret := []*Container{}
488cc.pubsub.Subscribe(key, func(event PubSubEvent) {
489if ContainerSelectorMatches(&selector, event.Container) {
490f(event)
491}
492}, func() {
493// Fetch the list of containers inside pubsub.Subscribe() to
494// guarantee that no new container event will be published at
495// the same time.
496cc.ContainerRangeWithSelector(&selector, func(c *Container) {
497ret = append(ret, c)
498})
499})
500return ret
501}
502
503// Unsubscribe undoes a previous call to Subscribe
504func (cc *ContainerCollection) Unsubscribe(key interface{}) {
505if cc.pubsub == nil {
506panic("ContainerCollection's pubsub uninitialized")
507}
508cc.pubsub.Unsubscribe(key)
509}
510
511func (cc *ContainerCollection) Close() {
512cc.mu.Lock()
513defer cc.mu.Unlock()
514
515close(cc.done)
516
517if !cc.initialized || cc.closed {
518panic("ContainerCollection is not initialized or has been closed")
519}
520
521// TODO: it's not clear if we want/can allow to re-initialize
522// this instance yet, so we don't set cc.initialized = false.
523cc.closed = true
524
525for _, f := range cc.cleanUpFuncs {
526f()
527}
528
529// Similar to RemoveContainer() on all containers but without publishing
530// events.
531cc.containers.Range(func(key, value interface{}) bool {
532c := value.(*Container)
533c.close()
534cc.containers.Delete(c)
535return true
536})
537}
538