istio
265 строк · 5.2 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package filewatcher
16
17import (
18"bufio"
19"bytes"
20"crypto/sha256"
21"fmt"
22"io"
23"os"
24"sync"
25
26"github.com/fsnotify/fsnotify"
27)
28
29type worker struct {
30mu sync.RWMutex
31
32// watcher is an fsnotify watcher that watches the parent
33// dir of watchedFiles.
34dirWatcher *fsnotify.Watcher
35
36// The worker maintains a map of channels keyed by watched file path.
37// The worker watches parent path of given path,
38// and filters out events of given path, then redirect
39// to the result channel.
40// Note that for symlink files, the content in received events
41// do not have to be related to the file itself.
42watchedFiles map[string]*fileTracker
43
44// tracker lifecycle
45retireTrackerCh chan *fileTracker
46
47// tells the worker to exit
48terminateCh chan bool
49}
50
51type fileTracker struct {
52events chan fsnotify.Event
53errors chan error
54
55// Hash sum to indicate if a file has been updated.
56hash []byte
57}
58
59func newWorker(path string, funcs *patchTable) (*worker, error) {
60dirWatcher, err := funcs.newWatcher()
61if err != nil {
62return nil, err
63}
64
65if err = funcs.addWatcherPath(dirWatcher, path); err != nil {
66_ = dirWatcher.Close()
67return nil, err
68}
69
70wk := &worker{
71dirWatcher: dirWatcher,
72watchedFiles: make(map[string]*fileTracker),
73retireTrackerCh: make(chan *fileTracker),
74terminateCh: make(chan bool),
75}
76
77go wk.listen()
78
79return wk, nil
80}
81
82func (wk *worker) listen() {
83wk.loop()
84
85_ = wk.dirWatcher.Close()
86
87// drain any retiring trackers that may be pending
88wk.drainRetiringTrackers()
89
90// clean up the rest
91for _, ft := range wk.watchedFiles {
92retireTracker(ft)
93}
94}
95
96func (wk *worker) loop() {
97for {
98select {
99case event := <-wk.dirWatcher.Events:
100// work on a copy of the watchedFiles map, so that we don't interfere
101// with the caller's use of the map
102for path, ft := range wk.getTrackers() {
103if ft.events == nil {
104// tracker has been retired, skip it
105continue
106}
107
108sum := getHashSum(path)
109if !bytes.Equal(sum, ft.hash) {
110ft.hash = sum
111
112select {
113case ft.events <- event:
114// nothing to do
115
116case ft := <-wk.retireTrackerCh:
117retireTracker(ft)
118
119case <-wk.terminateCh:
120return
121}
122}
123}
124
125case err := <-wk.dirWatcher.Errors:
126for _, ft := range wk.getTrackers() {
127if ft.errors == nil {
128// tracker has been retired, skip it
129continue
130}
131
132select {
133case ft.errors <- err:
134// nothing to do
135
136case ft := <-wk.retireTrackerCh:
137retireTracker(ft)
138
139case <-wk.terminateCh:
140return
141}
142}
143
144case ft := <-wk.retireTrackerCh:
145retireTracker(ft)
146
147case <-wk.terminateCh:
148return
149}
150}
151}
152
153// used only by the worker goroutine
154func (wk *worker) drainRetiringTrackers() {
155// cleanup any trackers that were in the process
156// of being retired, but didn't get processed due
157// to termination
158for {
159select {
160case ft := <-wk.retireTrackerCh:
161retireTracker(ft)
162default:
163return
164}
165}
166}
167
168// make a local copy of the set of trackers to avoid contention with callers
169// used only by the worker goroutine
170func (wk *worker) getTrackers() map[string]*fileTracker {
171wk.mu.RLock()
172
173result := make(map[string]*fileTracker, len(wk.watchedFiles))
174for k, v := range wk.watchedFiles {
175result[k] = v
176}
177
178wk.mu.RUnlock()
179return result
180}
181
182// used only by the worker goroutine
183func retireTracker(ft *fileTracker) {
184close(ft.events)
185close(ft.errors)
186ft.events = nil
187ft.errors = nil
188}
189
190func (wk *worker) terminate() {
191wk.terminateCh <- true
192}
193
194func (wk *worker) addPath(path string) error {
195wk.mu.Lock()
196
197ft := wk.watchedFiles[path]
198if ft != nil {
199wk.mu.Unlock()
200return fmt.Errorf("path %s is already being watched", path)
201}
202
203ft = &fileTracker{
204events: make(chan fsnotify.Event),
205errors: make(chan error),
206hash: getHashSum(path),
207}
208
209wk.watchedFiles[path] = ft
210wk.mu.Unlock()
211
212return nil
213}
214
215func (wk *worker) removePath(path string) error {
216wk.mu.Lock()
217
218ft := wk.watchedFiles[path]
219if ft == nil {
220wk.mu.Unlock()
221return fmt.Errorf("path %s not found", path)
222}
223
224delete(wk.watchedFiles, path)
225wk.mu.Unlock()
226
227wk.retireTrackerCh <- ft
228return nil
229}
230
231func (wk *worker) eventChannel(path string) chan fsnotify.Event {
232wk.mu.RLock()
233defer wk.mu.RUnlock()
234
235if ft := wk.watchedFiles[path]; ft != nil {
236return ft.events
237}
238
239return nil
240}
241
242func (wk *worker) errorChannel(path string) chan error {
243wk.mu.RLock()
244defer wk.mu.RUnlock()
245
246if ft := wk.watchedFiles[path]; ft != nil {
247return ft.errors
248}
249
250return nil
251}
252
253// gets the hash of the given file, or nil if there's a problem
254func getHashSum(file string) []byte {
255f, err := os.Open(file)
256if err != nil {
257return nil
258}
259defer f.Close()
260r := bufio.NewReader(f)
261
262h := sha256.New()
263_, _ = io.Copy(h, r)
264return h.Sum(nil)
265}
266