inspektor-gadget

Форк
0
/
container-collection.go 
537 строк · 16.5 Кб
1
// Copyright 2019-2023 The Inspektor Gadget authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
// Package containercollection provides the ContainerCollection struct to keep
16
// track of the set of running containers and primitives to query that set with
17
// various criteria.
18
//
19
// It is used by the Gadget Tracer Manager to keep track of containers part of
20
// Kubernetes pods and by IG Manager to keep track of containers on a
21
// Linux system.
22
package containercollection
23

24
import (
25
	"sync"
26
	"time"
27

28
	log "github.com/sirupsen/logrus"
29
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30

31
	eventtypes "github.com/inspektor-gadget/inspektor-gadget/pkg/types"
32
)
33

34
// ContainerCollection holds a set of containers. It can be embedded as an
35
// anonymous struct to help other structs implement the ContainerResolver
36
// interface. For this reason, some methods are namespaced with 'Container' to
37
// make this clear.
38
type ContainerCollection struct {
39
	mu sync.Mutex
40

41
	// Keys:   containerID string
42
	// Values: container   Container
43
	containers sync.Map
44

45
	// Keys:   MntNsID     string
46
	// Values: container   Container
47
	containersByMntNs sync.Map
48

49
	// Keys:   NetNsID     string
50
	// Values: container   Container
51
	containersByNetNs sync.Map
52

53
	// Saves containers for "cacheDelay" to be able to enrich events after the container is
54
	// removed. This is enabled by using WithTracerCollection().
55
	cachedContainers *sync.Map
56
	cacheDelay       time.Duration
57

58
	// subs contains a list of subscribers of container events
59
	pubsub *GadgetPubSub
60

61
	// containerEnrichers are functions that automatically add metadata
62
	// upon AddContainer. The functions return true on success or false if
63
	// the container is meant to be dropped.
64
	containerEnrichers []func(container *Container) (ok bool)
65

66
	// initialContainers is used during the initialization process to
67
	// gather initial containers and then call the enrichers
68
	initialContainers []*Container
69

70
	// nodeName is used by the Enrich() function
71
	nodeName string
72

73
	// initialized tells if Initialize() has been called.
74
	initialized bool
75

76
	// closed tells if Close() has been called.
77
	closed bool
78
	done   chan struct{}
79

80
	// functions to be called on Close()
81
	cleanUpFuncs []func()
82

83
	// disableContainerRuntimeWarnings is used to disable warnings about container runtimes.
84
	disableContainerRuntimeWarnings bool
85
}
86

87
// ContainerCollectionOption are options to pass to
88
// Initialize using the functional option code pattern.
89
type ContainerCollectionOption func(*ContainerCollection) error
90

91
// Initialize initializes a ContainerCollection. It is
92
// useful when ContainerCollection is embedded as an anonymous struct because
93
// we don't use a contructor in that case.
94
func (cc *ContainerCollection) Initialize(options ...ContainerCollectionOption) error {
95
	cc.done = make(chan struct{})
96

97
	if cc.initialized {
98
		panic("Initialize already called")
99
	}
100

101
	// Call functional options. This might fetch initial containers.
102
	for _, o := range options {
103
		err := o(cc)
104
		if err != nil {
105
			return err
106
		}
107
	}
108

109
	// Consume initial containers that might have been fetched by
110
	// functional options. This is done after all functional options have
111
	// been called, so that cc.containerEnrichers is fully set up.
112
	for _, container := range cc.initialContainers {
113
		cc.AddContainer(container)
114
	}
115
	cc.initialContainers = nil
116

117
	cc.initialized = true
118
	return nil
119
}
120

121
// GetContainer looks up a container by the container id and return it if
122
// found, or return nil if not found.
123
func (cc *ContainerCollection) GetContainer(id string) *Container {
124
	v, ok := cc.containers.Load(id)
125
	if !ok {
126
		return nil
127
	}
128
	container := v.(*Container)
129
	return container
130
}
131

132
// RemoveContainer removes a container from the collection, but only after
133
// notifying all the subscribers.
134
func (cc *ContainerCollection) RemoveContainer(id string) {
135
	v, loaded := cc.containers.Load(id)
136
	if !loaded {
137
		return
138
	}
139

140
	container := v.(*Container)
141

142
	if cc.pubsub != nil {
143
		cc.pubsub.Publish(EventTypeRemoveContainer, container)
144
	}
145

146
	// Save the container in the cache as enrichers might need the container some time after it
147
	// has been removed.
148
	if cc.cachedContainers != nil {
149
		container.deletionTimestamp = time.Now()
150
		cc.cachedContainers.Store(id, v)
151
	}
152

153
	// Remove the container from the collection after publishing the event as
154
	// subscribers might need to use the different collection's lookups during
155
	// the notification handler, and they expect the container to still be
156
	// present.
157
	cc.containers.Delete(id)
158

159
	// Make this operation atomic, as RemoveContainer() could be called concurrently, which could result in
160
	// dirty map contents
161
	cc.mu.Lock()
162
	defer cc.mu.Unlock()
163

164
	// Remove from MntNs lookup
165
	mntNsContainer, ok := cc.containersByMntNs.Load(container.Mntns)
166
	if !ok || mntNsContainer.(*Container).Runtime.ContainerID != container.Runtime.ContainerID {
167
		log.Warn("container not found or mismatch in mntns lookup map")
168
		return
169
	} else {
170
		cc.containersByMntNs.Delete(container.Mntns)
171
	}
172

173
	// Remove from NetNs lookup; arrays should be immutable, so recreate them
174
	netNsContainers, ok := cc.containersByNetNs.Load(container.Netns)
175
	if !ok {
176
		log.Warn("container netns not found in netns lookup map")
177
		return
178
	}
179

180
	found := false
181
	netNsContainersArr := netNsContainers.([]*Container)
182
	newNetNsContainers := make([]*Container, 0, len(netNsContainersArr)-1)
183
	for _, netNsContainer := range netNsContainersArr {
184
		if netNsContainer.Runtime.ContainerID == container.Runtime.ContainerID {
185
			found = true
186
			continue
187
		}
188
		newNetNsContainers = append(newNetNsContainers, netNsContainer)
189
	}
190
	if !found {
191
		log.Warn("container not found in netns lookup array")
192
	}
193

194
	if len(newNetNsContainers) > 0 {
195
		cc.containersByNetNs.Store(container.Netns, newNetNsContainers)
196
	} else {
197
		// clean up empty entries
198
		cc.containersByNetNs.Delete(container.Netns)
199
	}
200
}
201

202
// AddContainer adds a container to the collection.
203
func (cc *ContainerCollection) AddContainer(container *Container) {
204
	for _, enricher := range cc.containerEnrichers {
205
		ok := enricher(container)
206
		// Enrichers can decide to drop a container
207
		if !ok {
208
			container.close()
209
			return
210
		}
211
	}
212

213
	_, loaded := cc.containers.LoadOrStore(container.Runtime.ContainerID, container)
214
	if loaded {
215
		return
216
	}
217
	cc.mu.Lock()
218
	cc.containersByMntNs.Store(container.Mntns, container)
219
	arr, ok := cc.containersByNetNs.Load(container.Netns)
220
	var newContainerArr []*Container
221
	if ok {
222
		newContainerArr = append(newContainerArr, arr.([]*Container)...)
223
	}
224
	newContainerArr = append(newContainerArr, container)
225
	cc.containersByNetNs.Store(container.Netns, newContainerArr)
226
	cc.mu.Unlock()
227

228
	if cc.pubsub != nil {
229
		cc.pubsub.Publish(EventTypeAddContainer, container)
230
	}
231
}
232

233
// LookupMntnsByContainer returns the mount namespace inode of the container
234
// specified in arguments or zero if not found
235
func (cc *ContainerCollection) LookupMntnsByContainer(namespace, pod, container string) (mntns uint64) {
236
	cc.containers.Range(func(key, value interface{}) bool {
237
		c := value.(*Container)
238
		if namespace == c.K8s.Namespace && pod == c.K8s.PodName && container == c.K8s.ContainerName {
239
			mntns = c.Mntns
240
			// container found, stop iterating
241
			return false
242
		}
243
		return true
244
	})
245
	return
246
}
247

248
func lookupContainerByMntns(m *sync.Map, mntnsid uint64) *Container {
249
	var container *Container
250

251
	m.Range(func(key, value interface{}) bool {
252
		c := value.(*Container)
253
		if c.Mntns == mntnsid {
254
			container = c
255
			// container found, stop iterating
256
			return false
257
		}
258
		return true
259
	})
260
	return container
261
}
262

263
// LookupContainerByMntns returns a container by its mount namespace
264
// inode id. If not found nil is returned.
265
func (cc *ContainerCollection) LookupContainerByMntns(mntnsid uint64) *Container {
266
	container, ok := cc.containersByMntNs.Load(mntnsid)
267
	if !ok {
268
		return nil
269
	}
270
	return container.(*Container)
271
}
272

273
// LookupContainersByNetns returns a slice of containers that run in a given
274
// network namespace. Or an empty slice if there are no containers running in
275
// that network namespace.
276
func (cc *ContainerCollection) LookupContainersByNetns(netnsid uint64) []*Container {
277
	containers, ok := cc.containersByNetNs.Load(netnsid)
278
	if !ok {
279
		return nil
280
	}
281
	return containers.([]*Container)
282
}
283

284
func lookupContainersByNetns(m *sync.Map, netnsid uint64) (containers []*Container) {
285
	m.Range(func(key, value interface{}) bool {
286
		c := value.(*Container)
287
		if c.Netns == netnsid {
288
			containers = append(containers, c)
289
		}
290
		return true
291
	})
292
	return containers
293
}
294

295
// LookupMntnsByPod returns the mount namespace inodes of all containers
296
// belonging to the pod specified in arguments, indexed by the name of the
297
// containers or an empty map if not found
298
func (cc *ContainerCollection) LookupMntnsByPod(namespace, pod string) map[string]uint64 {
299
	ret := make(map[string]uint64)
300
	cc.containers.Range(func(key, value interface{}) bool {
301
		c := value.(*Container)
302
		if namespace == c.K8s.Namespace && pod == c.K8s.PodName {
303
			ret[c.K8s.ContainerName] = c.Mntns
304
		}
305
		return true
306
	})
307
	return ret
308
}
309

310
// LookupPIDByContainer returns the PID of the container
311
// specified in arguments or zero if not found
312
func (cc *ContainerCollection) LookupPIDByContainer(namespace, pod, container string) (pid uint32) {
313
	cc.containers.Range(func(key, value interface{}) bool {
314
		c := value.(*Container)
315
		if namespace == c.K8s.Namespace && pod == c.K8s.PodName && container == c.K8s.ContainerName {
316
			pid = c.Pid
317
			// container found, stop iterating
318
			return false
319
		}
320
		return true
321
	})
322
	return
323
}
324

325
// LookupPIDByPod returns the PID of all containers belonging to
326
// the pod specified in arguments, indexed by the name of the
327
// containers or an empty map if not found
328
func (cc *ContainerCollection) LookupPIDByPod(namespace, pod string) map[string]uint32 {
329
	ret := make(map[string]uint32)
330
	cc.containers.Range(func(key, value interface{}) bool {
331
		c := value.(*Container)
332
		if namespace == c.K8s.Namespace && pod == c.K8s.PodName {
333
			ret[c.K8s.ContainerName] = c.Pid
334
		}
335
		return true
336
	})
337
	return ret
338
}
339

340
// LookupOwnerReferenceByMntns returns a pointer to the owner reference of the
341
// container identified by the mount namespace, or nil if not found
342
func (cc *ContainerCollection) LookupOwnerReferenceByMntns(mntns uint64) *metav1.OwnerReference {
343
	var ownerRef *metav1.OwnerReference
344
	var err error
345
	cc.containers.Range(func(key, value interface{}) bool {
346
		c := value.(*Container)
347
		if mntns == c.Mntns {
348
			ownerRef, err = c.GetOwnerReference()
349
			if err != nil {
350
				log.Warnf("Failed to get owner reference of %s/%s/%s: %s",
351
					c.K8s.Namespace, c.K8s.PodName, c.K8s.ContainerName, err)
352
			}
353
			// container found, stop iterating
354
			return false
355
		}
356
		return true
357
	})
358
	return ownerRef
359
}
360

361
// GetContainersBySelector returns a slice of containers that match
362
// the selector or an empty slice if there are not matches
363
func (cc *ContainerCollection) GetContainersBySelector(
364
	containerSelector *ContainerSelector,
365
) []*Container {
366
	selectedContainers := []*Container{}
367
	cc.containers.Range(func(key, value interface{}) bool {
368
		c := value.(*Container)
369
		if ContainerSelectorMatches(containerSelector, c) {
370
			selectedContainers = append(selectedContainers, c)
371
		}
372
		return true
373
	})
374
	return selectedContainers
375
}
376

377
// ContainerLen returns how many containers are stored in the collection.
378
func (cc *ContainerCollection) ContainerLen() (count int) {
379
	cc.containers.Range(func(key, value interface{}) bool {
380
		count++
381
		return true
382
	})
383
	return
384
}
385

386
// ContainerRange iterates over the containers of the collection and calls the
387
// callback function for each of them.
388
func (cc *ContainerCollection) ContainerRange(f func(*Container)) {
389
	cc.containers.Range(func(key, value interface{}) bool {
390
		c := value.(*Container)
391
		f(c)
392
		return true
393
	})
394
}
395

396
// ContainerRangeWithSelector iterates over the containers of the collection
397
// and calls the callback function for each of those that matches the container
398
// selector.
399
func (cc *ContainerCollection) ContainerRangeWithSelector(
400
	containerSelector *ContainerSelector,
401
	f func(*Container),
402
) {
403
	cc.containers.Range(func(key, value interface{}) bool {
404
		c := value.(*Container)
405
		if ContainerSelectorMatches(containerSelector, c) {
406
			f(c)
407
		}
408
		return true
409
	})
410
}
411

412
func (cc *ContainerCollection) EnrichNode(event *eventtypes.CommonData) {
413
	event.K8s.Node = cc.nodeName
414
}
415

416
func (cc *ContainerCollection) EnrichByMntNs(event *eventtypes.CommonData, mountnsid uint64) {
417
	event.K8s.Node = cc.nodeName
418

419
	container := cc.LookupContainerByMntns(mountnsid)
420
	if container == nil && cc.cachedContainers != nil {
421
		container = lookupContainerByMntns(cc.cachedContainers, mountnsid)
422
	}
423

424
	if container != nil {
425
		event.K8s.ContainerName = container.K8s.ContainerName
426
		event.K8s.PodName = container.K8s.PodName
427
		event.K8s.PodLabels = container.K8s.PodLabels
428
		event.K8s.Namespace = container.K8s.Namespace
429

430
		event.Runtime.RuntimeName = container.Runtime.RuntimeName
431
		event.Runtime.ContainerName = container.Runtime.ContainerName
432
		event.Runtime.ContainerID = container.Runtime.ContainerID
433
		event.Runtime.ContainerImageName = container.Runtime.ContainerImageName
434
		event.Runtime.ContainerImageDigest = container.Runtime.ContainerImageDigest
435
	}
436
}
437

438
func (cc *ContainerCollection) EnrichByNetNs(event *eventtypes.CommonData, netnsid uint64) {
439
	event.K8s.Node = cc.nodeName
440

441
	containers := cc.LookupContainersByNetns(netnsid)
442
	if len(containers) == 0 && cc.cachedContainers != nil {
443
		containers = lookupContainersByNetns(cc.cachedContainers, netnsid)
444
	}
445
	if len(containers) == 0 {
446
		return
447
	}
448
	if containers[0].HostNetwork {
449
		event.K8s.HostNetwork = true
450
		return
451
	}
452

453
	if len(containers) == 1 {
454
		event.K8s.ContainerName = containers[0].K8s.ContainerName
455
		event.K8s.PodName = containers[0].K8s.PodName
456
		event.K8s.PodLabels = containers[0].K8s.PodLabels
457
		event.K8s.Namespace = containers[0].K8s.Namespace
458

459
		event.Runtime.RuntimeName = containers[0].Runtime.RuntimeName
460
		event.Runtime.ContainerName = containers[0].Runtime.ContainerName
461
		event.Runtime.ContainerID = containers[0].Runtime.ContainerID
462
		event.Runtime.ContainerImageName = containers[0].Runtime.ContainerImageName
463
		event.Runtime.ContainerImageDigest = containers[0].Runtime.ContainerImageDigest
464
		return
465
	}
466
	if containers[0].K8s.PodName != "" && containers[0].K8s.Namespace != "" {
467
		// Kubernetes containers within the same pod.
468
		event.K8s.PodName = containers[0].K8s.PodName
469
		event.K8s.PodLabels = containers[0].K8s.PodLabels
470
		event.K8s.Namespace = containers[0].K8s.Namespace
471

472
		// All containers in the same pod share the same container runtime
473
		event.Runtime.RuntimeName = containers[0].Runtime.RuntimeName
474
	}
475
	// else {
476
	// 	TODO: Non-Kubernetes containers sharing the same network namespace.
477
	// 	What should we do here?
478
	// }
479
}
480

481
// Subscribe returns the list of existing containers and registers a callback
482
// for notifications about additions and deletions of containers
483
func (cc *ContainerCollection) Subscribe(key interface{}, selector ContainerSelector, f FuncNotify) []*Container {
484
	if cc.pubsub == nil {
485
		panic("ContainerCollection's pubsub uninitialized")
486
	}
487
	ret := []*Container{}
488
	cc.pubsub.Subscribe(key, func(event PubSubEvent) {
489
		if ContainerSelectorMatches(&selector, event.Container) {
490
			f(event)
491
		}
492
	}, func() {
493
		// Fetch the list of containers inside pubsub.Subscribe() to
494
		// guarantee that no new container event will be published at
495
		// the same time.
496
		cc.ContainerRangeWithSelector(&selector, func(c *Container) {
497
			ret = append(ret, c)
498
		})
499
	})
500
	return ret
501
}
502

503
// Unsubscribe undoes a previous call to Subscribe
504
func (cc *ContainerCollection) Unsubscribe(key interface{}) {
505
	if cc.pubsub == nil {
506
		panic("ContainerCollection's pubsub uninitialized")
507
	}
508
	cc.pubsub.Unsubscribe(key)
509
}
510

511
func (cc *ContainerCollection) Close() {
512
	cc.mu.Lock()
513
	defer cc.mu.Unlock()
514

515
	close(cc.done)
516

517
	if !cc.initialized || cc.closed {
518
		panic("ContainerCollection is not initialized or has been closed")
519
	}
520

521
	// TODO: it's not clear if we want/can allow to re-initialize
522
	// this instance yet, so we don't set cc.initialized = false.
523
	cc.closed = true
524

525
	for _, f := range cc.cleanUpFuncs {
526
		f()
527
	}
528

529
	// Similar to RemoveContainer() on all containers but without publishing
530
	// events.
531
	cc.containers.Range(func(key, value interface{}) bool {
532
		c := value.(*Container)
533
		c.close()
534
		cc.containers.Delete(c)
535
		return true
536
	})
537
}
538

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.