1
// Copyright Istio Authors
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
18
"istio.io/istio/pilot/pkg/model"
19
config2 "istio.io/istio/pkg/config"
20
"istio.io/istio/pkg/log"
24
// BufferSize specifies the buffer size of event channel
28
// Handler specifies a function to apply on a Config for a given event type
29
type Handler func(config2.Config, config2.Config, model.Event)
31
// Monitor provides methods of manipulating changes in the config store
32
type Monitor interface {
34
AppendEventHandler(config2.GroupVersionKind, Handler)
35
ScheduleProcessEvent(ConfigEvent)
38
// ConfigEvent defines the event to be processed
39
type ConfigEvent struct {
45
type configStoreMonitor struct {
46
store model.ConfigStore
47
handlers map[config2.GroupVersionKind][]Handler
48
eventCh chan ConfigEvent
49
// If enabled, events will be handled synchronously
53
// NewMonitor returns new Monitor implementation with a default event buffer size.
54
func NewMonitor(store model.ConfigStore) Monitor {
55
return newBufferedMonitor(store, BufferSize, false)
58
// NewMonitor returns new Monitor implementation which will process events synchronously
59
func NewSyncMonitor(store model.ConfigStore) Monitor {
60
return newBufferedMonitor(store, BufferSize, true)
63
// NewBufferedMonitor returns new Monitor implementation with the specified event buffer size
64
func newBufferedMonitor(store model.ConfigStore, bufferSize int, sync bool) Monitor {
65
handlers := make(map[config2.GroupVersionKind][]Handler)
67
for _, s := range store.Schemas().All() {
68
handlers[s.GroupVersionKind()] = make([]Handler, 0)
71
return &configStoreMonitor{
74
eventCh: make(chan ConfigEvent, bufferSize),
79
func (m *configStoreMonitor) ScheduleProcessEvent(configEvent ConfigEvent) {
81
m.processConfigEvent(configEvent)
83
m.eventCh <- configEvent
87
func (m *configStoreMonitor) Run(stop <-chan struct{}) {
96
case ce, ok := <-m.eventCh:
98
m.processConfigEvent(ce)
104
func (m *configStoreMonitor) processConfigEvent(ce ConfigEvent) {
105
if _, exists := m.handlers[ce.config.GroupVersionKind]; !exists {
106
log.Warnf("Config GroupVersionKind %s does not exist in config store", ce.config.GroupVersionKind)
109
m.applyHandlers(ce.old, ce.config, ce.event)
112
func (m *configStoreMonitor) AppendEventHandler(typ config2.GroupVersionKind, h Handler) {
113
m.handlers[typ] = append(m.handlers[typ], h)
116
func (m *configStoreMonitor) applyHandlers(old config2.Config, config config2.Config, e model.Event) {
117
for _, f := range m.handlers[config.GroupVersionKind] {