podman

Форк
0
157 строк · 4.8 Кб
1
// Copyright 2020 The go-libvirt Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//   http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package event
16

17
import (
18
	"context"
19
)
20

21
// emptyEvent is used as a zero-value. Clients will never receive one of these;
22
// they are only here to satisfy the compiler. See the comments in process() for
23
// more information.
24
type emptyEvent struct{}
25

26
func (emptyEvent) GetCallbackID() int32 { return 0 }
27

28
// Stream is an unbounded buffered event channel. The implementation
29
// consists of a pair of unbuffered channels and a goroutine to manage them.
30
// Client behavior will not cause incoming events to block.
31
type Stream struct {
32
	// Program specifies the source of the events - libvirt or QEMU.
33
	Program uint32
34

35
	// CallbackID is returned by the event registration call.
36
	CallbackID int32
37

38
	// manage unbounded channel behavior.
39
	queue   []Event
40
	qlen    chan (chan int)
41
	in, out chan Event
42

43
	// terminates processing
44
	shutdown context.CancelFunc
45
}
46

47
// NewStream configures a new Event Stream. Incoming events are appended to a
48
// queue, which is then relayed to the listening client. Client behavior will
49
// not cause incoming events to block. It is the responsibility of the caller
50
// to terminate the Stream via Shutdown() when no longer in use.
51
func NewStream(program uint32, cbID int32) *Stream {
52
	s := &Stream{
53
		Program:    program,
54
		CallbackID: cbID,
55
		in:         make(chan Event),
56
		out:        make(chan Event),
57
		qlen:       make(chan (chan int)),
58
	}
59

60
	// Start the processing loop, which will return a routine we can use to
61
	// shut the queue down later.
62
	s.shutdown = s.start()
63

64
	return s
65
}
66

67
// Len will return the current count of events in the internal queue for a
68
// stream. It does this by sending a message to the stream's process() loop,
69
// which will then write the current length to the channel contained in that
70
// message.
71
func (s *Stream) Len() int {
72
	// Send a request to the process() loop to get the current length of the
73
	// queue
74
	ch := make(chan int)
75
	s.qlen <- ch
76
	return <-ch
77
}
78

79
// Recv returns the next available event from the Stream's queue.
80
func (s *Stream) Recv() chan Event {
81
	return s.out
82
}
83

84
// Push appends a new event to the queue.
85
func (s *Stream) Push(e Event) {
86
	s.in <- e
87
}
88

89
// Shutdown gracefully terminates Stream processing, releasing all internal
90
// resources. Events which have not yet been received by the client will be
91
// dropped. Subsequent calls to Shutdown() are idempotent.
92
func (s *Stream) Shutdown() {
93
	if s.shutdown != nil {
94
		s.shutdown()
95
	}
96
}
97

98
// start starts the event processing loop, which will continue to run until
99
// terminated by the returned context.CancelFunc.
100
func (s *Stream) start() context.CancelFunc {
101
	ctx, cancel := context.WithCancel(context.Background())
102

103
	go s.process(ctx)
104

105
	return cancel
106
}
107

108
// process manages an Stream's lifecycle until canceled by the provided context.
109
// Incoming events are appended to a queue which is then relayed to the
110
// listening client. New events pushed onto the queue will not block if the
111
// client is not actively polling for them; the stream will buffer them
112
// internally.
113
func (s *Stream) process(ctx context.Context) {
114
	// Close the output channel so that clients know this stream is finished.
115
	// We don't close s.in to avoid creating a race with the stream's Push()
116
	// function.
117
	defer close(s.out)
118

119
	// This function is used to retrieve the next event from the queue, to be
120
	// sent to the client. If there are no more events to send, it returns a nil
121
	// channel and a zero-value event.
122
	nextEvent := func() (chan Event, Event) {
123
		sendCh := chan Event(nil)
124
		next := Event(emptyEvent{})
125
		if len(s.queue) > 0 {
126
			sendCh = s.out
127
			next = s.queue[0]
128
		}
129
		return sendCh, next
130
	}
131

132
	// The select statement in this loop relies on the fact that a send to a nil
133
	// channel will block forever. If we have no entries in the queue, the
134
	// sendCh variable will be nil, so the clause that attempts to send an event
135
	// to the client will never complete. Clients will never receive an
136
	// emptyEvent.
137
	for {
138
		sendCh, nextEvt := nextEvent()
139

140
		select {
141
		// new event received, append to queue
142
		case e := <-s.in:
143
			s.queue = append(s.queue, e)
144

145
		case lenCh := <-s.qlen:
146
			lenCh <- len(s.queue)
147

148
		// client received an event, pop from queue
149
		case sendCh <- nextEvt:
150
			s.queue = s.queue[1:]
151

152
		// shutdown requested
153
		case <-ctx.Done():
154
			return
155
		}
156
	}
157
}
158

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.