6
"github.com/prometheus/client_golang/prometheus"
8
"github.com/kumahq/kuma/pkg/core"
9
core_metrics "github.com/kumahq/kuma/pkg/metrics"
12
var log = core.Log.WithName("eventbus")
14
type subscriber struct {
16
predicates []Predicate
19
func NewEventBus(bufferSize uint, metrics core_metrics.Metrics) (EventBus, error) {
20
metric := prometheus.NewCounter(prometheus.CounterOpts{
21
Name: "events_dropped",
22
Help: "Number of dropped events in event bus due to full channels",
24
if err := metrics.Register(metric); err != nil {
28
subscribers: map[string]subscriber{},
29
bufferSize: bufferSize,
36
subscribers map[string]subscriber
38
metric prometheus.Counter
41
// Subscribe subscribes to a stream of events given Predicates
42
// Predicate should not block on I/O, otherwise the whole event bus can block.
43
// All predicates must pass for the event to enqueued.
44
func (b *eventBus) Subscribe(predicates ...Predicate) Listener {
49
events := make(chan Event, b.bufferSize)
50
b.subscribers[id] = subscriber{
52
predicates: predicates,
59
delete(b.subscribers, id)
64
func (b *eventBus) Send(event Event) {
67
for _, sub := range b.subscribers {
69
for _, predicate := range sub.predicates {
70
if !predicate(event) {
79
log.Info("[WARNING] event is not sent because the channel is full. Ignoring event. Consider increasing buffer size using KUMA_EVENT_BUS_BUFFER_SIZE",
80
"bufferSize", b.bufferSize,
93
func (k *reader) Recv() <-chan Event {
97
func (k *reader) Close() {