kuma

Форк
0
/
eventbus.go 
99 строк · 2.1 Кб
1
package events
2

3
import (
4
	"sync"
5

6
	"github.com/prometheus/client_golang/prometheus"
7

8
	"github.com/kumahq/kuma/pkg/core"
9
	core_metrics "github.com/kumahq/kuma/pkg/metrics"
10
)
11

12
var log = core.Log.WithName("eventbus")
13

14
type subscriber struct {
15
	ch         chan Event
16
	predicates []Predicate
17
}
18

19
func NewEventBus(bufferSize uint, metrics core_metrics.Metrics) (EventBus, error) {
20
	metric := prometheus.NewCounter(prometheus.CounterOpts{
21
		Name: "events_dropped",
22
		Help: "Number of dropped events in event bus due to full channels",
23
	})
24
	if err := metrics.Register(metric); err != nil {
25
		return nil, err
26
	}
27
	return &eventBus{
28
		subscribers: map[string]subscriber{},
29
		bufferSize:  bufferSize,
30
		metric:      metric,
31
	}, nil
32
}
33

34
type eventBus struct {
35
	mtx         sync.RWMutex
36
	subscribers map[string]subscriber
37
	bufferSize  uint
38
	metric      prometheus.Counter
39
}
40

41
// Subscribe subscribes to a stream of events given Predicates
42
// Predicate should not block on I/O, otherwise the whole event bus can block.
43
// All predicates must pass for the event to enqueued.
44
func (b *eventBus) Subscribe(predicates ...Predicate) Listener {
45
	id := core.NewUUID()
46
	b.mtx.Lock()
47
	defer b.mtx.Unlock()
48

49
	events := make(chan Event, b.bufferSize)
50
	b.subscribers[id] = subscriber{
51
		ch:         events,
52
		predicates: predicates,
53
	}
54
	return &reader{
55
		events: events,
56
		close: func() {
57
			b.mtx.Lock()
58
			defer b.mtx.Unlock()
59
			delete(b.subscribers, id)
60
		},
61
	}
62
}
63

64
func (b *eventBus) Send(event Event) {
65
	b.mtx.RLock()
66
	defer b.mtx.RUnlock()
67
	for _, sub := range b.subscribers {
68
		matched := true
69
		for _, predicate := range sub.predicates {
70
			if !predicate(event) {
71
				matched = false
72
			}
73
		}
74
		if matched {
75
			select {
76
			case sub.ch <- event:
77
			default:
78
				b.metric.Inc()
79
				log.Info("[WARNING] event is not sent because the channel is full. Ignoring event. Consider increasing buffer size using KUMA_EVENT_BUS_BUFFER_SIZE",
80
					"bufferSize", b.bufferSize,
81
					"event", event,
82
				)
83
			}
84
		}
85
	}
86
}
87

88
type reader struct {
89
	events chan Event
90
	close  func()
91
}
92

93
func (k *reader) Recv() <-chan Event {
94
	return k.events
95
}
96

97
func (k *reader) Close() {
98
	k.close()
99
}
100

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.