podman

Форк
0
463 строки · 13.0 Кб
1
// Copyright 2014 go-dockerclient authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
4

5
package docker
6

7
import (
8
	"encoding/json"
9
	"errors"
10
	"io"
11
	"math"
12
	"net"
13
	"net/http"
14
	"net/http/httputil"
15
	"strconv"
16
	"sync"
17
	"sync/atomic"
18
	"time"
19
)
20

21
// EventsOptions to filter events
22
// See https://docs.docker.com/engine/api/v1.41/#operation/SystemEvents for more details.
23
type EventsOptions struct {
24
	// Show events created since this timestamp then stream new events.
25
	Since string
26

27
	// Show events created until this timestamp then stop streaming.
28
	Until string
29

30
	// Filter for events. For example:
31
	//  map[string][]string{"type": {"container"}, "event": {"start", "die"}}
32
	// will return events when container was started and stopped or killed
33
	//
34
	// Available filters:
35
	//  config=<string> config name or ID
36
	//  container=<string> container name or ID
37
	//  daemon=<string> daemon name or ID
38
	//  event=<string> event type
39
	//  image=<string> image name or ID
40
	//  label=<string> image or container label
41
	//  network=<string> network name or ID
42
	//  node=<string> node ID
43
	//  plugin= plugin name or ID
44
	//  scope= local or swarm
45
	//  secret=<string> secret name or ID
46
	//  service=<string> service name or ID
47
	//  type=<string> container, image, volume, network, daemon, plugin, node, service, secret or config
48
	//  volume=<string> volume name
49
	Filters map[string][]string
50
}
51

52
// APIEvents represents events coming from the Docker API
53
// The fields in the Docker API changed in API version 1.22, and
54
// events for more than images and containers are now fired off.
55
// To maintain forward and backward compatibility, go-dockerclient
56
// replicates the event in both the new and old format as faithfully as possible.
57
//
58
// For events that only exist in 1.22 in later, `Status` is filled in as
59
// `"Type:Action"` instead of just `Action` to allow for older clients to
60
// differentiate and not break if they rely on the pre-1.22 Status types.
61
//
62
// The transformEvent method can be consulted for more information about how
63
// events are translated from new/old API formats
64
type APIEvents struct {
65
	// New API Fields in 1.22
66
	Action string   `json:"action,omitempty"`
67
	Type   string   `json:"type,omitempty"`
68
	Actor  APIActor `json:"actor,omitempty"`
69

70
	// Old API fields for < 1.22
71
	Status string `json:"status,omitempty"`
72
	ID     string `json:"id,omitempty"`
73
	From   string `json:"from,omitempty"`
74

75
	// Fields in both
76
	Time     int64 `json:"time,omitempty"`
77
	TimeNano int64 `json:"timeNano,omitempty"`
78
}
79

80
// APIActor represents an actor that accomplishes something for an event
81
type APIActor struct {
82
	ID         string            `json:"id,omitempty"`
83
	Attributes map[string]string `json:"attributes,omitempty"`
84
}
85

86
type eventMonitoringState struct {
87
	// `sync/atomic` expects the first word in an allocated struct to be 64-bit
88
	// aligned on both ARM and x86-32. See https://goo.gl/zW7dgq for more details.
89
	lastSeen int64
90
	sync.RWMutex
91
	sync.WaitGroup
92
	enabled   bool
93
	C         chan *APIEvents
94
	errC      chan error
95
	listeners []chan<- *APIEvents
96
	closeConn func()
97
}
98

99
const (
100
	maxMonitorConnRetries = 5
101
	retryInitialWaitTime  = 10.
102
)
103

104
var (
105
	// ErrNoListeners is the error returned when no listeners are available
106
	// to receive an event.
107
	ErrNoListeners = errors.New("no listeners present to receive event")
108

109
	// ErrListenerAlreadyExists is the error returned when the listerner already
110
	// exists.
111
	ErrListenerAlreadyExists = errors.New("listener already exists for docker events")
112

113
	// ErrTLSNotSupported is the error returned when the client does not support
114
	// TLS (this applies to the Windows named pipe client).
115
	ErrTLSNotSupported = errors.New("tls not supported by this client")
116

117
	// EOFEvent is sent when the event listener receives an EOF error.
118
	EOFEvent = &APIEvents{
119
		Type:   "EOF",
120
		Status: "EOF",
121
	}
122
)
123

124
// AddEventListener adds a new listener to container events in the Docker API.
125
//
126
// The parameter is a channel through which events will be sent.
127
func (c *Client) AddEventListener(listener chan<- *APIEvents) error {
128
	return c.AddEventListenerWithOptions(EventsOptions{}, listener)
129
}
130

131
// AddEventListener adds a new listener to container events in the Docker API.
132
// See https://docs.docker.com/engine/api/v1.41/#operation/SystemEvents for more details.
133
//
134
// The listener parameter is a channel through which events will be sent.
135
func (c *Client) AddEventListenerWithOptions(options EventsOptions, listener chan<- *APIEvents) error {
136
	var err error
137
	if !c.eventMonitor.isEnabled() {
138
		err = c.eventMonitor.enableEventMonitoring(c, options)
139
		if err != nil {
140
			return err
141
		}
142
	}
143
	return c.eventMonitor.addListener(listener)
144
}
145

146
// RemoveEventListener removes a listener from the monitor.
147
func (c *Client) RemoveEventListener(listener chan *APIEvents) error {
148
	err := c.eventMonitor.removeListener(listener)
149
	if err != nil {
150
		return err
151
	}
152
	if c.eventMonitor.listernersCount() == 0 {
153
		c.eventMonitor.disableEventMonitoring()
154
	}
155
	return nil
156
}
157

158
func (eventState *eventMonitoringState) addListener(listener chan<- *APIEvents) error {
159
	eventState.Lock()
160
	defer eventState.Unlock()
161
	if listenerExists(listener, &eventState.listeners) {
162
		return ErrListenerAlreadyExists
163
	}
164
	eventState.Add(1)
165
	eventState.listeners = append(eventState.listeners, listener)
166
	return nil
167
}
168

169
func (eventState *eventMonitoringState) removeListener(listener chan<- *APIEvents) error {
170
	eventState.Lock()
171
	defer eventState.Unlock()
172
	if listenerExists(listener, &eventState.listeners) {
173
		var newListeners []chan<- *APIEvents
174
		for _, l := range eventState.listeners {
175
			if l != listener {
176
				newListeners = append(newListeners, l)
177
			}
178
		}
179
		eventState.listeners = newListeners
180
		eventState.Add(-1)
181
	}
182
	return nil
183
}
184

185
func (eventState *eventMonitoringState) closeListeners() {
186
	for _, l := range eventState.listeners {
187
		close(l)
188
		eventState.Add(-1)
189
	}
190
	eventState.listeners = nil
191
}
192

193
func (eventState *eventMonitoringState) listernersCount() int {
194
	eventState.RLock()
195
	defer eventState.RUnlock()
196
	return len(eventState.listeners)
197
}
198

199
func listenerExists(a chan<- *APIEvents, list *[]chan<- *APIEvents) bool {
200
	for _, b := range *list {
201
		if b == a {
202
			return true
203
		}
204
	}
205
	return false
206
}
207

208
func (eventState *eventMonitoringState) enableEventMonitoring(c *Client, opts EventsOptions) error {
209
	eventState.Lock()
210
	defer eventState.Unlock()
211
	if !eventState.enabled {
212
		eventState.enabled = true
213
		atomic.StoreInt64(&eventState.lastSeen, 0)
214
		eventState.C = make(chan *APIEvents, 100)
215
		eventState.errC = make(chan error, 1)
216
		go eventState.monitorEvents(c, opts)
217
	}
218
	return nil
219
}
220

221
func (eventState *eventMonitoringState) disableEventMonitoring() {
222
	eventState.Lock()
223
	defer eventState.Unlock()
224

225
	eventState.closeListeners()
226

227
	eventState.Wait()
228

229
	if eventState.enabled {
230
		eventState.enabled = false
231
		close(eventState.C)
232
		close(eventState.errC)
233

234
		if eventState.closeConn != nil {
235
			eventState.closeConn()
236
			eventState.closeConn = nil
237
		}
238
	}
239
}
240

241
func (eventState *eventMonitoringState) monitorEvents(c *Client, opts EventsOptions) {
242
	const (
243
		noListenersTimeout  = 5 * time.Second
244
		noListenersInterval = 10 * time.Millisecond
245
		noListenersMaxTries = noListenersTimeout / noListenersInterval
246
	)
247

248
	var err error
249
	for i := time.Duration(0); i < noListenersMaxTries && eventState.noListeners(); i++ {
250
		time.Sleep(10 * time.Millisecond)
251
	}
252

253
	if eventState.noListeners() {
254
		// terminate if no listener is available after 5 seconds.
255
		// Prevents goroutine leak when RemoveEventListener is called
256
		// right after AddEventListener.
257
		eventState.disableEventMonitoring()
258
		return
259
	}
260

261
	if err = eventState.connectWithRetry(c, opts); err != nil {
262
		// terminate if connect failed
263
		eventState.disableEventMonitoring()
264
		return
265
	}
266
	for eventState.isEnabled() {
267
		timeout := time.After(100 * time.Millisecond)
268
		select {
269
		case ev, ok := <-eventState.C:
270
			if !ok {
271
				return
272
			}
273
			if ev == EOFEvent {
274
				eventState.disableEventMonitoring()
275
				return
276
			}
277
			eventState.updateLastSeen(ev)
278
			eventState.sendEvent(ev)
279
		case err = <-eventState.errC:
280
			if errors.Is(err, ErrNoListeners) {
281
				eventState.disableEventMonitoring()
282
				return
283
			} else if err != nil {
284
				defer func() { go eventState.monitorEvents(c, opts) }()
285
				return
286
			}
287
		case <-timeout:
288
			continue
289
		}
290
	}
291
}
292

293
func (eventState *eventMonitoringState) connectWithRetry(c *Client, opts EventsOptions) error {
294
	var retries int
295
	eventState.RLock()
296
	eventChan := eventState.C
297
	errChan := eventState.errC
298
	eventState.RUnlock()
299
	closeConn, err := c.eventHijack(opts, atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
300
	for ; err != nil && retries < maxMonitorConnRetries; retries++ {
301
		waitTime := int64(retryInitialWaitTime * math.Pow(2, float64(retries)))
302
		time.Sleep(time.Duration(waitTime) * time.Millisecond)
303
		eventState.RLock()
304
		eventChan = eventState.C
305
		errChan = eventState.errC
306
		eventState.RUnlock()
307
		closeConn, err = c.eventHijack(opts, atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
308
	}
309
	eventState.Lock()
310
	defer eventState.Unlock()
311
	eventState.closeConn = closeConn
312
	return err
313
}
314

315
func (eventState *eventMonitoringState) noListeners() bool {
316
	eventState.RLock()
317
	defer eventState.RUnlock()
318
	return len(eventState.listeners) == 0
319
}
320

321
func (eventState *eventMonitoringState) isEnabled() bool {
322
	eventState.RLock()
323
	defer eventState.RUnlock()
324
	return eventState.enabled
325
}
326

327
func (eventState *eventMonitoringState) sendEvent(event *APIEvents) {
328
	eventState.RLock()
329
	defer eventState.RUnlock()
330
	eventState.Add(1)
331
	defer eventState.Done()
332
	if eventState.enabled {
333
		if len(eventState.listeners) == 0 {
334
			eventState.errC <- ErrNoListeners
335
			return
336
		}
337

338
		for _, listener := range eventState.listeners {
339
			select {
340
			case listener <- event:
341
			default:
342
			}
343
		}
344
	}
345
}
346

347
func (eventState *eventMonitoringState) updateLastSeen(e *APIEvents) {
348
	eventState.Lock()
349
	defer eventState.Unlock()
350
	if atomic.LoadInt64(&eventState.lastSeen) < e.Time {
351
		atomic.StoreInt64(&eventState.lastSeen, e.Time)
352
	}
353
}
354

355
func (c *Client) eventHijack(opts EventsOptions, startTime int64, eventChan chan *APIEvents, errChan chan error) (closeConn func(), err error) {
356
	// on reconnect override initial Since with last event seen time
357
	if startTime != 0 {
358
		opts.Since = strconv.FormatInt(startTime, 10)
359
	}
360
	uri := "/events?" + queryString(opts)
361
	protocol := c.endpointURL.Scheme
362
	address := c.endpointURL.Path
363
	if protocol != "unix" && protocol != "npipe" {
364
		protocol = "tcp"
365
		address = c.endpointURL.Host
366
	}
367
	var dial net.Conn
368
	if c.TLSConfig == nil {
369
		dial, err = c.Dialer.Dial(protocol, address)
370
	} else {
371
		netDialer, ok := c.Dialer.(*net.Dialer)
372
		if !ok {
373
			return nil, ErrTLSNotSupported
374
		}
375
		dial, err = tlsDialWithDialer(netDialer, protocol, address, c.TLSConfig)
376
	}
377
	if err != nil {
378
		return nil, err
379
	}
380
	//lint:ignore SA1019 the alternative doesn't quite work, so keep using the deprecated thing.
381
	conn := httputil.NewClientConn(dial, nil)
382
	req, err := http.NewRequest(http.MethodGet, uri, nil)
383
	if err != nil {
384
		return nil, err
385
	}
386
	res, err := conn.Do(req)
387
	if err != nil {
388
		return nil, err
389
	}
390

391
	keepRunning := int32(1)
392
	//lint:ignore SA1019 the alternative doesn't quite work, so keep using the deprecated thing.
393
	go func(res *http.Response, conn *httputil.ClientConn) {
394
		defer conn.Close()
395
		defer res.Body.Close()
396
		decoder := json.NewDecoder(res.Body)
397
		for atomic.LoadInt32(&keepRunning) == 1 {
398
			var event APIEvents
399
			if err := decoder.Decode(&event); err != nil {
400
				if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
401
					c.eventMonitor.RLock()
402
					if c.eventMonitor.enabled && c.eventMonitor.C == eventChan {
403
						// Signal that we're exiting.
404
						eventChan <- EOFEvent
405
					}
406
					c.eventMonitor.RUnlock()
407
					break
408
				}
409
				errChan <- err
410
			}
411
			if event.Time == 0 {
412
				continue
413
			}
414
			transformEvent(&event)
415
			c.eventMonitor.RLock()
416
			if c.eventMonitor.enabled && c.eventMonitor.C == eventChan {
417
				eventChan <- &event
418
			}
419
			c.eventMonitor.RUnlock()
420
		}
421
	}(res, conn)
422
	return func() {
423
		atomic.StoreInt32(&keepRunning, 0)
424
	}, nil
425
}
426

427
// transformEvent takes an event and determines what version it is from
428
// then populates both versions of the event
429
func transformEvent(event *APIEvents) {
430
	// if event version is <= 1.21 there will be no Action and no Type
431
	if event.Action == "" && event.Type == "" {
432
		event.Action = event.Status
433
		event.Actor.ID = event.ID
434
		event.Actor.Attributes = map[string]string{}
435
		switch event.Status {
436
		case "delete", "import", "pull", "push", "tag", "untag":
437
			event.Type = "image"
438
		default:
439
			event.Type = "container"
440
			if event.From != "" {
441
				event.Actor.Attributes["image"] = event.From
442
			}
443
		}
444
	} else {
445
		if event.Status == "" {
446
			if event.Type == "image" || event.Type == "container" {
447
				event.Status = event.Action
448
			} else {
449
				// Because just the Status has been overloaded with different Types
450
				// if an event is not for an image or a container, we prepend the type
451
				// to avoid problems for people relying on actions being only for
452
				// images and containers
453
				event.Status = event.Type + ":" + event.Action
454
			}
455
		}
456
		if event.ID == "" {
457
			event.ID = event.Actor.ID
458
		}
459
		if event.From == "" {
460
			event.From = event.Actor.Attributes["image"]
461
		}
462
	}
463
}
464

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.