istio

Форк
0
/
worker.go 
265 строк · 5.2 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package filewatcher
16

17
import (
18
	"bufio"
19
	"bytes"
20
	"crypto/sha256"
21
	"fmt"
22
	"io"
23
	"os"
24
	"sync"
25

26
	"github.com/fsnotify/fsnotify"
27
)
28

29
type worker struct {
30
	mu sync.RWMutex
31

32
	// watcher is an fsnotify watcher that watches the parent
33
	// dir of watchedFiles.
34
	dirWatcher *fsnotify.Watcher
35

36
	// The worker maintains a map of channels keyed by watched file path.
37
	// The worker watches parent path of given path,
38
	// and filters out events of given path, then redirect
39
	// to the result channel.
40
	// Note that for symlink files, the content in received events
41
	// do not have to be related to the file itself.
42
	watchedFiles map[string]*fileTracker
43

44
	// tracker lifecycle
45
	retireTrackerCh chan *fileTracker
46

47
	// tells the worker to exit
48
	terminateCh chan bool
49
}
50

51
type fileTracker struct {
52
	events chan fsnotify.Event
53
	errors chan error
54

55
	// Hash sum to indicate if a file has been updated.
56
	hash []byte
57
}
58

59
func newWorker(path string, funcs *patchTable) (*worker, error) {
60
	dirWatcher, err := funcs.newWatcher()
61
	if err != nil {
62
		return nil, err
63
	}
64

65
	if err = funcs.addWatcherPath(dirWatcher, path); err != nil {
66
		_ = dirWatcher.Close()
67
		return nil, err
68
	}
69

70
	wk := &worker{
71
		dirWatcher:      dirWatcher,
72
		watchedFiles:    make(map[string]*fileTracker),
73
		retireTrackerCh: make(chan *fileTracker),
74
		terminateCh:     make(chan bool),
75
	}
76

77
	go wk.listen()
78

79
	return wk, nil
80
}
81

82
func (wk *worker) listen() {
83
	wk.loop()
84

85
	_ = wk.dirWatcher.Close()
86

87
	// drain any retiring trackers that may be pending
88
	wk.drainRetiringTrackers()
89

90
	// clean up the rest
91
	for _, ft := range wk.watchedFiles {
92
		retireTracker(ft)
93
	}
94
}
95

96
func (wk *worker) loop() {
97
	for {
98
		select {
99
		case event := <-wk.dirWatcher.Events:
100
			// work on a copy of the watchedFiles map, so that we don't interfere
101
			// with the caller's use of the map
102
			for path, ft := range wk.getTrackers() {
103
				if ft.events == nil {
104
					// tracker has been retired, skip it
105
					continue
106
				}
107

108
				sum := getHashSum(path)
109
				if !bytes.Equal(sum, ft.hash) {
110
					ft.hash = sum
111

112
					select {
113
					case ft.events <- event:
114
						// nothing to do
115

116
					case ft := <-wk.retireTrackerCh:
117
						retireTracker(ft)
118

119
					case <-wk.terminateCh:
120
						return
121
					}
122
				}
123
			}
124

125
		case err := <-wk.dirWatcher.Errors:
126
			for _, ft := range wk.getTrackers() {
127
				if ft.errors == nil {
128
					// tracker has been retired, skip it
129
					continue
130
				}
131

132
				select {
133
				case ft.errors <- err:
134
					// nothing to do
135

136
				case ft := <-wk.retireTrackerCh:
137
					retireTracker(ft)
138

139
				case <-wk.terminateCh:
140
					return
141
				}
142
			}
143

144
		case ft := <-wk.retireTrackerCh:
145
			retireTracker(ft)
146

147
		case <-wk.terminateCh:
148
			return
149
		}
150
	}
151
}
152

153
// used only by the worker goroutine
154
func (wk *worker) drainRetiringTrackers() {
155
	// cleanup any trackers that were in the process
156
	// of being retired, but didn't get processed due
157
	// to termination
158
	for {
159
		select {
160
		case ft := <-wk.retireTrackerCh:
161
			retireTracker(ft)
162
		default:
163
			return
164
		}
165
	}
166
}
167

168
// make a local copy of the set of trackers to avoid contention with callers
169
// used only by the worker goroutine
170
func (wk *worker) getTrackers() map[string]*fileTracker {
171
	wk.mu.RLock()
172

173
	result := make(map[string]*fileTracker, len(wk.watchedFiles))
174
	for k, v := range wk.watchedFiles {
175
		result[k] = v
176
	}
177

178
	wk.mu.RUnlock()
179
	return result
180
}
181

182
// used only by the worker goroutine
183
func retireTracker(ft *fileTracker) {
184
	close(ft.events)
185
	close(ft.errors)
186
	ft.events = nil
187
	ft.errors = nil
188
}
189

190
func (wk *worker) terminate() {
191
	wk.terminateCh <- true
192
}
193

194
func (wk *worker) addPath(path string) error {
195
	wk.mu.Lock()
196

197
	ft := wk.watchedFiles[path]
198
	if ft != nil {
199
		wk.mu.Unlock()
200
		return fmt.Errorf("path %s is already being watched", path)
201
	}
202

203
	ft = &fileTracker{
204
		events: make(chan fsnotify.Event),
205
		errors: make(chan error),
206
		hash:   getHashSum(path),
207
	}
208

209
	wk.watchedFiles[path] = ft
210
	wk.mu.Unlock()
211

212
	return nil
213
}
214

215
func (wk *worker) removePath(path string) error {
216
	wk.mu.Lock()
217

218
	ft := wk.watchedFiles[path]
219
	if ft == nil {
220
		wk.mu.Unlock()
221
		return fmt.Errorf("path %s not found", path)
222
	}
223

224
	delete(wk.watchedFiles, path)
225
	wk.mu.Unlock()
226

227
	wk.retireTrackerCh <- ft
228
	return nil
229
}
230

231
func (wk *worker) eventChannel(path string) chan fsnotify.Event {
232
	wk.mu.RLock()
233
	defer wk.mu.RUnlock()
234

235
	if ft := wk.watchedFiles[path]; ft != nil {
236
		return ft.events
237
	}
238

239
	return nil
240
}
241

242
func (wk *worker) errorChannel(path string) chan error {
243
	wk.mu.RLock()
244
	defer wk.mu.RUnlock()
245

246
	if ft := wk.watchedFiles[path]; ft != nil {
247
		return ft.errors
248
	}
249

250
	return nil
251
}
252

253
// gets the hash of the given file, or nil if there's a problem
254
func getHashSum(file string) []byte {
255
	f, err := os.Open(file)
256
	if err != nil {
257
		return nil
258
	}
259
	defer f.Close()
260
	r := bufio.NewReader(f)
261

262
	h := sha256.New()
263
	_, _ = io.Copy(h, r)
264
	return h.Sum(nil)
265
}
266

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.