go-tg-screenshot-bot

Форк
0
125 строк · 2.6 Кб
1
package dbus
2

3
import (
4
	"sync"
5
)
6

7
// NewSequentialSignalHandler returns an instance of a new
8
// signal handler that guarantees sequential processing of signals. It is a
9
// guarantee of this signal handler that signals will be written to
10
// channels in the order they are received on the DBus connection.
11
func NewSequentialSignalHandler() SignalHandler {
12
	return &sequentialSignalHandler{}
13
}
14

15
type sequentialSignalHandler struct {
16
	mu      sync.RWMutex
17
	closed  bool
18
	signals []*sequentialSignalChannelData
19
}
20

21
func (sh *sequentialSignalHandler) DeliverSignal(intf, name string, signal *Signal) {
22
	sh.mu.RLock()
23
	defer sh.mu.RUnlock()
24
	if sh.closed {
25
		return
26
	}
27
	for _, scd := range sh.signals {
28
		scd.deliver(signal)
29
	}
30
}
31

32
func (sh *sequentialSignalHandler) Terminate() {
33
	sh.mu.Lock()
34
	defer sh.mu.Unlock()
35
	if sh.closed {
36
		return
37
	}
38

39
	for _, scd := range sh.signals {
40
		scd.close()
41
		close(scd.ch)
42
	}
43
	sh.closed = true
44
	sh.signals = nil
45
}
46

47
func (sh *sequentialSignalHandler) AddSignal(ch chan<- *Signal) {
48
	sh.mu.Lock()
49
	defer sh.mu.Unlock()
50
	if sh.closed {
51
		return
52
	}
53
	sh.signals = append(sh.signals, newSequentialSignalChannelData(ch))
54
}
55

56
func (sh *sequentialSignalHandler) RemoveSignal(ch chan<- *Signal) {
57
	sh.mu.Lock()
58
	defer sh.mu.Unlock()
59
	if sh.closed {
60
		return
61
	}
62
	for i := len(sh.signals) - 1; i >= 0; i-- {
63
		if ch == sh.signals[i].ch {
64
			sh.signals[i].close()
65
			copy(sh.signals[i:], sh.signals[i+1:])
66
			sh.signals[len(sh.signals)-1] = nil
67
			sh.signals = sh.signals[:len(sh.signals)-1]
68
		}
69
	}
70
}
71

72
type sequentialSignalChannelData struct {
73
	ch   chan<- *Signal
74
	in   chan *Signal
75
	done chan struct{}
76
}
77

78
func newSequentialSignalChannelData(ch chan<- *Signal) *sequentialSignalChannelData {
79
	scd := &sequentialSignalChannelData{
80
		ch:   ch,
81
		in:   make(chan *Signal),
82
		done: make(chan struct{}),
83
	}
84
	go scd.bufferSignals()
85
	return scd
86
}
87

88
func (scd *sequentialSignalChannelData) bufferSignals() {
89
	defer close(scd.done)
90

91
	// Ensure that signals are delivered to scd.ch in the same
92
	// order they are received from scd.in.
93
	var queue []*Signal
94
	for {
95
		if len(queue) == 0 {
96
			signal, ok := <- scd.in
97
			if !ok {
98
				return
99
			}
100
			queue = append(queue, signal)
101
		}
102
		select {
103
		case scd.ch <- queue[0]:
104
			copy(queue, queue[1:])
105
			queue[len(queue)-1] = nil
106
			queue = queue[:len(queue)-1]
107
		case signal, ok := <-scd.in:
108
			if !ok {
109
				return
110
			}
111
			queue = append(queue, signal)
112
		}
113
	}
114
}
115

116
func (scd *sequentialSignalChannelData) deliver(signal *Signal) {
117
	scd.in <- signal
118
}
119

120
func (scd *sequentialSignalChannelData) close() {
121
	close(scd.in)
122
	// Ensure that bufferSignals() has exited and won't attempt
123
	// any future sends on scd.ch
124
	<-scd.done
125
}
126

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.