inspektor-gadget

Форк
0
121 строка · 2.3 Кб
1
// Copyright 2019-2021 The Inspektor Gadget authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package stream
16

17
import (
18
	"sync"
19
)
20

21
const (
22
	HistorySize    = 100
23
	SubChannelSize = 250
24
)
25

26
type Record struct {
27
	Line      string
28
	EventLost bool
29
}
30

31
type GadgetStream struct {
32
	mu sync.RWMutex
33

34
	previousLines []Record
35

36
	// subs contains a list of subscribers
37
	subs map[chan Record]struct{}
38

39
	closed bool
40
}
41

42
func NewGadgetStream() *GadgetStream {
43
	return &GadgetStream{
44
		subs: make(map[chan Record]struct{}),
45
	}
46
}
47

48
func (g *GadgetStream) Subscribe() chan Record {
49
	g.mu.Lock()
50
	defer g.mu.Unlock()
51

52
	if g.closed {
53
		return nil
54
	}
55

56
	ch := make(chan Record, SubChannelSize)
57
	for _, l := range g.previousLines {
58
		ch <- l
59
	}
60
	g.subs[ch] = struct{}{}
61

62
	return ch
63
}
64

65
func (g *GadgetStream) Unsubscribe(ch chan Record) {
66
	g.mu.Lock()
67
	defer g.mu.Unlock()
68

69
	if g.closed {
70
		return
71
	}
72

73
	_, ok := g.subs[ch]
74
	if ok {
75
		delete(g.subs, ch)
76
		close(ch)
77
	}
78
}
79

80
func (g *GadgetStream) Publish(line string) {
81
	g.mu.Lock()
82
	defer g.mu.Unlock()
83

84
	if g.closed {
85
		return
86
	}
87

88
	newLine := Record{
89
		Line: line,
90
	}
91

92
	if len(g.previousLines) == HistorySize {
93
		// Force new array allocation to avoid an ever growing underlying array
94
		// TODO: check possible performance issue
95
		g.previousLines = append([]Record{}, g.previousLines[1:]...)
96
	}
97
	g.previousLines = append(g.previousLines, newLine)
98

99
	for ch := range g.subs {
100
		queuedCount := len(ch)
101
		switch {
102
		case queuedCount == cap(ch):
103
			// Channel full. There is nothing we can do.
104
			continue
105
		case queuedCount == cap(ch)-1:
106
			// Channel almost full. Last chance to signal the problem.
107
			ch <- Record{EventLost: true}
108
		case queuedCount < cap(ch)-1:
109
			ch <- newLine
110
		}
111
	}
112
}
113

114
func (g *GadgetStream) Close() {
115
	g.mu.Lock()
116
	defer g.mu.Unlock()
117
	for ch := range g.subs {
118
		close(ch)
119
	}
120
	g.closed = true
121
}
122

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.