podman
463 строки · 13.0 Кб
1// Copyright 2014 go-dockerclient authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package docker
6
7import (
8"encoding/json"
9"errors"
10"io"
11"math"
12"net"
13"net/http"
14"net/http/httputil"
15"strconv"
16"sync"
17"sync/atomic"
18"time"
19)
20
21// EventsOptions to filter events
22// See https://docs.docker.com/engine/api/v1.41/#operation/SystemEvents for more details.
23type EventsOptions struct {
24// Show events created since this timestamp then stream new events.
25Since string
26
27// Show events created until this timestamp then stop streaming.
28Until string
29
30// Filter for events. For example:
31// map[string][]string{"type": {"container"}, "event": {"start", "die"}}
32// will return events when container was started and stopped or killed
33//
34// Available filters:
35// config=<string> config name or ID
36// container=<string> container name or ID
37// daemon=<string> daemon name or ID
38// event=<string> event type
39// image=<string> image name or ID
40// label=<string> image or container label
41// network=<string> network name or ID
42// node=<string> node ID
43// plugin= plugin name or ID
44// scope= local or swarm
45// secret=<string> secret name or ID
46// service=<string> service name or ID
47// type=<string> container, image, volume, network, daemon, plugin, node, service, secret or config
48// volume=<string> volume name
49Filters map[string][]string
50}
51
52// APIEvents represents events coming from the Docker API
53// The fields in the Docker API changed in API version 1.22, and
54// events for more than images and containers are now fired off.
55// To maintain forward and backward compatibility, go-dockerclient
56// replicates the event in both the new and old format as faithfully as possible.
57//
58// For events that only exist in 1.22 in later, `Status` is filled in as
59// `"Type:Action"` instead of just `Action` to allow for older clients to
60// differentiate and not break if they rely on the pre-1.22 Status types.
61//
62// The transformEvent method can be consulted for more information about how
63// events are translated from new/old API formats
64type APIEvents struct {
65// New API Fields in 1.22
66Action string `json:"action,omitempty"`
67Type string `json:"type,omitempty"`
68Actor APIActor `json:"actor,omitempty"`
69
70// Old API fields for < 1.22
71Status string `json:"status,omitempty"`
72ID string `json:"id,omitempty"`
73From string `json:"from,omitempty"`
74
75// Fields in both
76Time int64 `json:"time,omitempty"`
77TimeNano int64 `json:"timeNano,omitempty"`
78}
79
80// APIActor represents an actor that accomplishes something for an event
81type APIActor struct {
82ID string `json:"id,omitempty"`
83Attributes map[string]string `json:"attributes,omitempty"`
84}
85
86type eventMonitoringState struct {
87// `sync/atomic` expects the first word in an allocated struct to be 64-bit
88// aligned on both ARM and x86-32. See https://goo.gl/zW7dgq for more details.
89lastSeen int64
90sync.RWMutex
91sync.WaitGroup
92enabled bool
93C chan *APIEvents
94errC chan error
95listeners []chan<- *APIEvents
96closeConn func()
97}
98
99const (
100maxMonitorConnRetries = 5
101retryInitialWaitTime = 10.
102)
103
104var (
105// ErrNoListeners is the error returned when no listeners are available
106// to receive an event.
107ErrNoListeners = errors.New("no listeners present to receive event")
108
109// ErrListenerAlreadyExists is the error returned when the listerner already
110// exists.
111ErrListenerAlreadyExists = errors.New("listener already exists for docker events")
112
113// ErrTLSNotSupported is the error returned when the client does not support
114// TLS (this applies to the Windows named pipe client).
115ErrTLSNotSupported = errors.New("tls not supported by this client")
116
117// EOFEvent is sent when the event listener receives an EOF error.
118EOFEvent = &APIEvents{
119Type: "EOF",
120Status: "EOF",
121}
122)
123
124// AddEventListener adds a new listener to container events in the Docker API.
125//
126// The parameter is a channel through which events will be sent.
127func (c *Client) AddEventListener(listener chan<- *APIEvents) error {
128return c.AddEventListenerWithOptions(EventsOptions{}, listener)
129}
130
131// AddEventListener adds a new listener to container events in the Docker API.
132// See https://docs.docker.com/engine/api/v1.41/#operation/SystemEvents for more details.
133//
134// The listener parameter is a channel through which events will be sent.
135func (c *Client) AddEventListenerWithOptions(options EventsOptions, listener chan<- *APIEvents) error {
136var err error
137if !c.eventMonitor.isEnabled() {
138err = c.eventMonitor.enableEventMonitoring(c, options)
139if err != nil {
140return err
141}
142}
143return c.eventMonitor.addListener(listener)
144}
145
146// RemoveEventListener removes a listener from the monitor.
147func (c *Client) RemoveEventListener(listener chan *APIEvents) error {
148err := c.eventMonitor.removeListener(listener)
149if err != nil {
150return err
151}
152if c.eventMonitor.listernersCount() == 0 {
153c.eventMonitor.disableEventMonitoring()
154}
155return nil
156}
157
158func (eventState *eventMonitoringState) addListener(listener chan<- *APIEvents) error {
159eventState.Lock()
160defer eventState.Unlock()
161if listenerExists(listener, &eventState.listeners) {
162return ErrListenerAlreadyExists
163}
164eventState.Add(1)
165eventState.listeners = append(eventState.listeners, listener)
166return nil
167}
168
169func (eventState *eventMonitoringState) removeListener(listener chan<- *APIEvents) error {
170eventState.Lock()
171defer eventState.Unlock()
172if listenerExists(listener, &eventState.listeners) {
173var newListeners []chan<- *APIEvents
174for _, l := range eventState.listeners {
175if l != listener {
176newListeners = append(newListeners, l)
177}
178}
179eventState.listeners = newListeners
180eventState.Add(-1)
181}
182return nil
183}
184
185func (eventState *eventMonitoringState) closeListeners() {
186for _, l := range eventState.listeners {
187close(l)
188eventState.Add(-1)
189}
190eventState.listeners = nil
191}
192
193func (eventState *eventMonitoringState) listernersCount() int {
194eventState.RLock()
195defer eventState.RUnlock()
196return len(eventState.listeners)
197}
198
199func listenerExists(a chan<- *APIEvents, list *[]chan<- *APIEvents) bool {
200for _, b := range *list {
201if b == a {
202return true
203}
204}
205return false
206}
207
208func (eventState *eventMonitoringState) enableEventMonitoring(c *Client, opts EventsOptions) error {
209eventState.Lock()
210defer eventState.Unlock()
211if !eventState.enabled {
212eventState.enabled = true
213atomic.StoreInt64(&eventState.lastSeen, 0)
214eventState.C = make(chan *APIEvents, 100)
215eventState.errC = make(chan error, 1)
216go eventState.monitorEvents(c, opts)
217}
218return nil
219}
220
221func (eventState *eventMonitoringState) disableEventMonitoring() {
222eventState.Lock()
223defer eventState.Unlock()
224
225eventState.closeListeners()
226
227eventState.Wait()
228
229if eventState.enabled {
230eventState.enabled = false
231close(eventState.C)
232close(eventState.errC)
233
234if eventState.closeConn != nil {
235eventState.closeConn()
236eventState.closeConn = nil
237}
238}
239}
240
241func (eventState *eventMonitoringState) monitorEvents(c *Client, opts EventsOptions) {
242const (
243noListenersTimeout = 5 * time.Second
244noListenersInterval = 10 * time.Millisecond
245noListenersMaxTries = noListenersTimeout / noListenersInterval
246)
247
248var err error
249for i := time.Duration(0); i < noListenersMaxTries && eventState.noListeners(); i++ {
250time.Sleep(10 * time.Millisecond)
251}
252
253if eventState.noListeners() {
254// terminate if no listener is available after 5 seconds.
255// Prevents goroutine leak when RemoveEventListener is called
256// right after AddEventListener.
257eventState.disableEventMonitoring()
258return
259}
260
261if err = eventState.connectWithRetry(c, opts); err != nil {
262// terminate if connect failed
263eventState.disableEventMonitoring()
264return
265}
266for eventState.isEnabled() {
267timeout := time.After(100 * time.Millisecond)
268select {
269case ev, ok := <-eventState.C:
270if !ok {
271return
272}
273if ev == EOFEvent {
274eventState.disableEventMonitoring()
275return
276}
277eventState.updateLastSeen(ev)
278eventState.sendEvent(ev)
279case err = <-eventState.errC:
280if errors.Is(err, ErrNoListeners) {
281eventState.disableEventMonitoring()
282return
283} else if err != nil {
284defer func() { go eventState.monitorEvents(c, opts) }()
285return
286}
287case <-timeout:
288continue
289}
290}
291}
292
293func (eventState *eventMonitoringState) connectWithRetry(c *Client, opts EventsOptions) error {
294var retries int
295eventState.RLock()
296eventChan := eventState.C
297errChan := eventState.errC
298eventState.RUnlock()
299closeConn, err := c.eventHijack(opts, atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
300for ; err != nil && retries < maxMonitorConnRetries; retries++ {
301waitTime := int64(retryInitialWaitTime * math.Pow(2, float64(retries)))
302time.Sleep(time.Duration(waitTime) * time.Millisecond)
303eventState.RLock()
304eventChan = eventState.C
305errChan = eventState.errC
306eventState.RUnlock()
307closeConn, err = c.eventHijack(opts, atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
308}
309eventState.Lock()
310defer eventState.Unlock()
311eventState.closeConn = closeConn
312return err
313}
314
315func (eventState *eventMonitoringState) noListeners() bool {
316eventState.RLock()
317defer eventState.RUnlock()
318return len(eventState.listeners) == 0
319}
320
321func (eventState *eventMonitoringState) isEnabled() bool {
322eventState.RLock()
323defer eventState.RUnlock()
324return eventState.enabled
325}
326
327func (eventState *eventMonitoringState) sendEvent(event *APIEvents) {
328eventState.RLock()
329defer eventState.RUnlock()
330eventState.Add(1)
331defer eventState.Done()
332if eventState.enabled {
333if len(eventState.listeners) == 0 {
334eventState.errC <- ErrNoListeners
335return
336}
337
338for _, listener := range eventState.listeners {
339select {
340case listener <- event:
341default:
342}
343}
344}
345}
346
347func (eventState *eventMonitoringState) updateLastSeen(e *APIEvents) {
348eventState.Lock()
349defer eventState.Unlock()
350if atomic.LoadInt64(&eventState.lastSeen) < e.Time {
351atomic.StoreInt64(&eventState.lastSeen, e.Time)
352}
353}
354
355func (c *Client) eventHijack(opts EventsOptions, startTime int64, eventChan chan *APIEvents, errChan chan error) (closeConn func(), err error) {
356// on reconnect override initial Since with last event seen time
357if startTime != 0 {
358opts.Since = strconv.FormatInt(startTime, 10)
359}
360uri := "/events?" + queryString(opts)
361protocol := c.endpointURL.Scheme
362address := c.endpointURL.Path
363if protocol != "unix" && protocol != "npipe" {
364protocol = "tcp"
365address = c.endpointURL.Host
366}
367var dial net.Conn
368if c.TLSConfig == nil {
369dial, err = c.Dialer.Dial(protocol, address)
370} else {
371netDialer, ok := c.Dialer.(*net.Dialer)
372if !ok {
373return nil, ErrTLSNotSupported
374}
375dial, err = tlsDialWithDialer(netDialer, protocol, address, c.TLSConfig)
376}
377if err != nil {
378return nil, err
379}
380//lint:ignore SA1019 the alternative doesn't quite work, so keep using the deprecated thing.
381conn := httputil.NewClientConn(dial, nil)
382req, err := http.NewRequest(http.MethodGet, uri, nil)
383if err != nil {
384return nil, err
385}
386res, err := conn.Do(req)
387if err != nil {
388return nil, err
389}
390
391keepRunning := int32(1)
392//lint:ignore SA1019 the alternative doesn't quite work, so keep using the deprecated thing.
393go func(res *http.Response, conn *httputil.ClientConn) {
394defer conn.Close()
395defer res.Body.Close()
396decoder := json.NewDecoder(res.Body)
397for atomic.LoadInt32(&keepRunning) == 1 {
398var event APIEvents
399if err := decoder.Decode(&event); err != nil {
400if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
401c.eventMonitor.RLock()
402if c.eventMonitor.enabled && c.eventMonitor.C == eventChan {
403// Signal that we're exiting.
404eventChan <- EOFEvent
405}
406c.eventMonitor.RUnlock()
407break
408}
409errChan <- err
410}
411if event.Time == 0 {
412continue
413}
414transformEvent(&event)
415c.eventMonitor.RLock()
416if c.eventMonitor.enabled && c.eventMonitor.C == eventChan {
417eventChan <- &event
418}
419c.eventMonitor.RUnlock()
420}
421}(res, conn)
422return func() {
423atomic.StoreInt32(&keepRunning, 0)
424}, nil
425}
426
427// transformEvent takes an event and determines what version it is from
428// then populates both versions of the event
429func transformEvent(event *APIEvents) {
430// if event version is <= 1.21 there will be no Action and no Type
431if event.Action == "" && event.Type == "" {
432event.Action = event.Status
433event.Actor.ID = event.ID
434event.Actor.Attributes = map[string]string{}
435switch event.Status {
436case "delete", "import", "pull", "push", "tag", "untag":
437event.Type = "image"
438default:
439event.Type = "container"
440if event.From != "" {
441event.Actor.Attributes["image"] = event.From
442}
443}
444} else {
445if event.Status == "" {
446if event.Type == "image" || event.Type == "container" {
447event.Status = event.Action
448} else {
449// Because just the Status has been overloaded with different Types
450// if an event is not for an image or a container, we prepend the type
451// to avoid problems for people relying on actions being only for
452// images and containers
453event.Status = event.Type + ":" + event.Action
454}
455}
456if event.ID == "" {
457event.ID = event.Actor.ID
458}
459if event.From == "" {
460event.From = event.Actor.Attributes["image"]
461}
462}
463}
464