4
. "github.com/onsi/ginkgo/v2"
5
. "github.com/onsi/gomega"
7
"github.com/kumahq/kuma/pkg/events"
8
core_metrics "github.com/kumahq/kuma/pkg/metrics"
9
test_metrics "github.com/kumahq/kuma/pkg/test/metrics"
12
var _ = Describe("EventBus", func() {
13
chHadEvent := func(ch <-chan events.Event) bool {
22
It("should not block on Send", func() {
24
metrics, err := core_metrics.NewMetrics("")
25
Expect(err).ToNot(HaveOccurred())
26
eventBus, err := events.NewEventBus(1, metrics)
27
Expect(err).ToNot(HaveOccurred())
28
listener := eventBus.Subscribe()
29
event1 := events.ResourceChangedEvent{TenantID: "1"}
30
event2 := events.ResourceChangedEvent{TenantID: "2"}
37
event := <-listener.Recv()
38
Expect(event).To(Equal(event1))
40
// and second event was ignored because buffer was full
41
Expect(chHadEvent(listener.Recv())).To(BeFalse())
42
Expect(test_metrics.FindMetric(metrics, "events_dropped").Counter.GetValue()).To(Equal(1.0))
45
It("should only send events matched predicate", func() {
47
metrics, err := core_metrics.NewMetrics("")
48
Expect(err).ToNot(HaveOccurred())
49
eventBus, err := events.NewEventBus(10, metrics)
50
Expect(err).ToNot(HaveOccurred())
51
listener := eventBus.Subscribe(func(event events.Event) bool {
52
return event.(events.ResourceChangedEvent).TenantID == "1"
54
event1 := events.ResourceChangedEvent{TenantID: "1"}
55
event2 := events.ResourceChangedEvent{TenantID: "2"}
62
event := <-listener.Recv()
63
Expect(event).To(Equal(event1))
65
// and second event was ignored, because it did not match predicate
66
Expect(chHadEvent(listener.Recv())).To(BeFalse())
67
Expect(test_metrics.FindMetric(metrics, "events_dropped").Counter.GetValue()).To(Equal(0.0))