podman
157 строк · 4.8 Кб
1// Copyright 2020 The go-libvirt Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package event
16
17import (
18"context"
19)
20
21// emptyEvent is used as a zero-value. Clients will never receive one of these;
22// they are only here to satisfy the compiler. See the comments in process() for
23// more information.
24type emptyEvent struct{}
25
26func (emptyEvent) GetCallbackID() int32 { return 0 }
27
28// Stream is an unbounded buffered event channel. The implementation
29// consists of a pair of unbuffered channels and a goroutine to manage them.
30// Client behavior will not cause incoming events to block.
31type Stream struct {
32// Program specifies the source of the events - libvirt or QEMU.
33Program uint32
34
35// CallbackID is returned by the event registration call.
36CallbackID int32
37
38// manage unbounded channel behavior.
39queue []Event
40qlen chan (chan int)
41in, out chan Event
42
43// terminates processing
44shutdown context.CancelFunc
45}
46
47// NewStream configures a new Event Stream. Incoming events are appended to a
48// queue, which is then relayed to the listening client. Client behavior will
49// not cause incoming events to block. It is the responsibility of the caller
50// to terminate the Stream via Shutdown() when no longer in use.
51func NewStream(program uint32, cbID int32) *Stream {
52s := &Stream{
53Program: program,
54CallbackID: cbID,
55in: make(chan Event),
56out: make(chan Event),
57qlen: make(chan (chan int)),
58}
59
60// Start the processing loop, which will return a routine we can use to
61// shut the queue down later.
62s.shutdown = s.start()
63
64return s
65}
66
67// Len will return the current count of events in the internal queue for a
68// stream. It does this by sending a message to the stream's process() loop,
69// which will then write the current length to the channel contained in that
70// message.
71func (s *Stream) Len() int {
72// Send a request to the process() loop to get the current length of the
73// queue
74ch := make(chan int)
75s.qlen <- ch
76return <-ch
77}
78
79// Recv returns the next available event from the Stream's queue.
80func (s *Stream) Recv() chan Event {
81return s.out
82}
83
84// Push appends a new event to the queue.
85func (s *Stream) Push(e Event) {
86s.in <- e
87}
88
89// Shutdown gracefully terminates Stream processing, releasing all internal
90// resources. Events which have not yet been received by the client will be
91// dropped. Subsequent calls to Shutdown() are idempotent.
92func (s *Stream) Shutdown() {
93if s.shutdown != nil {
94s.shutdown()
95}
96}
97
98// start starts the event processing loop, which will continue to run until
99// terminated by the returned context.CancelFunc.
100func (s *Stream) start() context.CancelFunc {
101ctx, cancel := context.WithCancel(context.Background())
102
103go s.process(ctx)
104
105return cancel
106}
107
108// process manages an Stream's lifecycle until canceled by the provided context.
109// Incoming events are appended to a queue which is then relayed to the
110// listening client. New events pushed onto the queue will not block if the
111// client is not actively polling for them; the stream will buffer them
112// internally.
113func (s *Stream) process(ctx context.Context) {
114// Close the output channel so that clients know this stream is finished.
115// We don't close s.in to avoid creating a race with the stream's Push()
116// function.
117defer close(s.out)
118
119// This function is used to retrieve the next event from the queue, to be
120// sent to the client. If there are no more events to send, it returns a nil
121// channel and a zero-value event.
122nextEvent := func() (chan Event, Event) {
123sendCh := chan Event(nil)
124next := Event(emptyEvent{})
125if len(s.queue) > 0 {
126sendCh = s.out
127next = s.queue[0]
128}
129return sendCh, next
130}
131
132// The select statement in this loop relies on the fact that a send to a nil
133// channel will block forever. If we have no entries in the queue, the
134// sendCh variable will be nil, so the clause that attempts to send an event
135// to the client will never complete. Clients will never receive an
136// emptyEvent.
137for {
138sendCh, nextEvt := nextEvent()
139
140select {
141// new event received, append to queue
142case e := <-s.in:
143s.queue = append(s.queue, e)
144
145case lenCh := <-s.qlen:
146lenCh <- len(s.queue)
147
148// client received an event, pop from queue
149case sendCh <- nextEvt:
150s.queue = s.queue[1:]
151
152// shutdown requested
153case <-ctx.Done():
154return
155}
156}
157}
158