inspektor-gadget
121 строка · 2.3 Кб
1// Copyright 2019-2021 The Inspektor Gadget authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package stream
16
17import (
18"sync"
19)
20
21const (
22HistorySize = 100
23SubChannelSize = 250
24)
25
26type Record struct {
27Line string
28EventLost bool
29}
30
31type GadgetStream struct {
32mu sync.RWMutex
33
34previousLines []Record
35
36// subs contains a list of subscribers
37subs map[chan Record]struct{}
38
39closed bool
40}
41
42func NewGadgetStream() *GadgetStream {
43return &GadgetStream{
44subs: make(map[chan Record]struct{}),
45}
46}
47
48func (g *GadgetStream) Subscribe() chan Record {
49g.mu.Lock()
50defer g.mu.Unlock()
51
52if g.closed {
53return nil
54}
55
56ch := make(chan Record, SubChannelSize)
57for _, l := range g.previousLines {
58ch <- l
59}
60g.subs[ch] = struct{}{}
61
62return ch
63}
64
65func (g *GadgetStream) Unsubscribe(ch chan Record) {
66g.mu.Lock()
67defer g.mu.Unlock()
68
69if g.closed {
70return
71}
72
73_, ok := g.subs[ch]
74if ok {
75delete(g.subs, ch)
76close(ch)
77}
78}
79
80func (g *GadgetStream) Publish(line string) {
81g.mu.Lock()
82defer g.mu.Unlock()
83
84if g.closed {
85return
86}
87
88newLine := Record{
89Line: line,
90}
91
92if len(g.previousLines) == HistorySize {
93// Force new array allocation to avoid an ever growing underlying array
94// TODO: check possible performance issue
95g.previousLines = append([]Record{}, g.previousLines[1:]...)
96}
97g.previousLines = append(g.previousLines, newLine)
98
99for ch := range g.subs {
100queuedCount := len(ch)
101switch {
102case queuedCount == cap(ch):
103// Channel full. There is nothing we can do.
104continue
105case queuedCount == cap(ch)-1:
106// Channel almost full. Last chance to signal the problem.
107ch <- Record{EventLost: true}
108case queuedCount < cap(ch)-1:
109ch <- newLine
110}
111}
112}
113
114func (g *GadgetStream) Close() {
115g.mu.Lock()
116defer g.mu.Unlock()
117for ch := range g.subs {
118close(ch)
119}
120g.closed = true
121}
122