go-tg-screenshot-bot
125 строк · 2.6 Кб
1package dbus2
3import (4"sync"5)
6
7// NewSequentialSignalHandler returns an instance of a new
8// signal handler that guarantees sequential processing of signals. It is a
9// guarantee of this signal handler that signals will be written to
10// channels in the order they are received on the DBus connection.
11func NewSequentialSignalHandler() SignalHandler {12return &sequentialSignalHandler{}13}
14
15type sequentialSignalHandler struct {16mu sync.RWMutex17closed bool18signals []*sequentialSignalChannelData19}
20
21func (sh *sequentialSignalHandler) DeliverSignal(intf, name string, signal *Signal) {22sh.mu.RLock()23defer sh.mu.RUnlock()24if sh.closed {25return26}27for _, scd := range sh.signals {28scd.deliver(signal)29}30}
31
32func (sh *sequentialSignalHandler) Terminate() {33sh.mu.Lock()34defer sh.mu.Unlock()35if sh.closed {36return37}38
39for _, scd := range sh.signals {40scd.close()41close(scd.ch)42}43sh.closed = true44sh.signals = nil45}
46
47func (sh *sequentialSignalHandler) AddSignal(ch chan<- *Signal) {48sh.mu.Lock()49defer sh.mu.Unlock()50if sh.closed {51return52}53sh.signals = append(sh.signals, newSequentialSignalChannelData(ch))54}
55
56func (sh *sequentialSignalHandler) RemoveSignal(ch chan<- *Signal) {57sh.mu.Lock()58defer sh.mu.Unlock()59if sh.closed {60return61}62for i := len(sh.signals) - 1; i >= 0; i-- {63if ch == sh.signals[i].ch {64sh.signals[i].close()65copy(sh.signals[i:], sh.signals[i+1:])66sh.signals[len(sh.signals)-1] = nil67sh.signals = sh.signals[:len(sh.signals)-1]68}69}70}
71
72type sequentialSignalChannelData struct {73ch chan<- *Signal74in chan *Signal75done chan struct{}76}
77
78func newSequentialSignalChannelData(ch chan<- *Signal) *sequentialSignalChannelData {79scd := &sequentialSignalChannelData{80ch: ch,81in: make(chan *Signal),82done: make(chan struct{}),83}84go scd.bufferSignals()85return scd86}
87
88func (scd *sequentialSignalChannelData) bufferSignals() {89defer close(scd.done)90
91// Ensure that signals are delivered to scd.ch in the same92// order they are received from scd.in.93var queue []*Signal94for {95if len(queue) == 0 {96signal, ok := <- scd.in97if !ok {98return99}100queue = append(queue, signal)101}102select {103case scd.ch <- queue[0]:104copy(queue, queue[1:])105queue[len(queue)-1] = nil106queue = queue[:len(queue)-1]107case signal, ok := <-scd.in:108if !ok {109return110}111queue = append(queue, signal)112}113}114}
115
116func (scd *sequentialSignalChannelData) deliver(signal *Signal) {117scd.in <- signal118}
119
120func (scd *sequentialSignalChannelData) close() {121close(scd.in)122// Ensure that bufferSignals() has exited and won't attempt123// any future sends on scd.ch124<-scd.done125}
126