From 8088e53d983ed5d381f24b87a8f630c4d5359f34 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 19 Apr 2018 21:14:41 +0200 Subject: [PATCH] Auditbeat recursive file watches for Windows (#6893) * Add support for case-insensitive text search in system-tests Allow system tests to search for text in the beat logfile using case-insensitive search. This is necessary to match paths in case-insensitive file systems, where the path logged may have different capitalisation than the one used in the system test. * Use custom version of fsnotify with recursion support * Added system-test for the file_integrity module * Auditbeat: Fix deadlock in fsnotify under Windows Under Windows, directories to be watched need to be installed after the event consumer loop is started. Otherwise there's a chance of a potential deadlock, as fsnotify under Windows uses a single goroutine to deliver events and install watches. If the channel it uses to deliver events is full, it will block and won't be able to install further watches. * Auditbeat: Use OS-support for recursive watches This patch enables OS-supported recursive watching in fsnotify if it is available. Currently only supported under Windows via our custom fsnotify fork. Fixes #6864 --- CHANGELOG.asciidoc | 1 + NOTICE.txt | 2 +- .../file_integrity/eventreader_fsnotify.go | 12 +- .../module/file_integrity/monitor/monitor.go | 4 +- auditbeat/tests/system/auditbeat.py | 39 ++++ .../tests/system/config/auditbeat.yml.j2 | 6 +- auditbeat/tests/system/test_file_integrity.py | 189 ++++++++++++++++++ libbeat/tests/system/beat/beat.py | 15 +- .../fsnotify/fsevents/example/main.go | 101 ++++++++++ vendor/github.com/fsnotify/fsnotify/AUTHORS | 6 + .../github.com/fsnotify/fsnotify/CHANGELOG.md | 10 + vendor/github.com/fsnotify/fsnotify/fen.go | 6 + .../github.com/fsnotify/fsnotify/inotify.go | 6 + vendor/github.com/fsnotify/fsnotify/kqueue.go | 68 +++++-- .../github.com/fsnotify/fsnotify/windows.go | 28 ++- vendor/vendor.json | 7 +- 16 files changed, 452 insertions(+), 48 deletions(-) create mode 100644 auditbeat/tests/system/test_file_integrity.py create mode 100644 vendor/github.com/fsnotify/fsevents/example/main.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 06622b8452e..4a5947f0c7b 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -57,6 +57,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di *Auditbeat* - Add hex decoding for the name field in audit path records. {pull}6687[6687] +- Fixed a deadlock in the file_integrity module under Windows. {issue}6864[6864] *Filebeat* diff --git a/NOTICE.txt b/NOTICE.txt index 5b9ebb0123d..a797bedc33a 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -543,7 +543,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------- Dependency: github.com/fsnotify/fsnotify -Revision: 4da3e2cfbabc9f751898f250b49f2439785783a1 +Revision: c9bbe1f46f1da9904baf3916a4ba4aec7f1e9000 License type (autodetected): BSD-3-Clause ./vendor/github.com/fsnotify/fsnotify/LICENSE: -------------------------------------------------------------------- diff --git a/auditbeat/module/file_integrity/eventreader_fsnotify.go b/auditbeat/module/file_integrity/eventreader_fsnotify.go index cac541626a8..731e370a97b 100644 --- a/auditbeat/module/file_integrity/eventreader_fsnotify.go +++ b/auditbeat/module/file_integrity/eventreader_fsnotify.go @@ -36,6 +36,14 @@ func NewEventReader(c Config) (EventProducer, error) { } func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) { + if err := r.watcher.Start(); err != nil { + return nil, errors.Wrap(err, "unable to start watcher") + } + go r.consumeEvents(done) + + // Windows implementation of fsnotify needs to have the watched paths + // installed after the event consumer is started, to avoid a potential + // deadlock. Do it on all platforms for simplicity. for _, p := range r.config.Paths { if err := r.watcher.Add(p); err != nil { if err == syscall.EMFILE { @@ -48,10 +56,6 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) { } } - if err := r.watcher.Start(); err != nil { - return nil, errors.Wrap(err, "unable to start watcher") - } - go r.consumeEvents(done) r.log.Infow("Started fsnotify watcher", "file_path", r.config.Paths, "recursive", r.config.Recursive) diff --git a/auditbeat/module/file_integrity/monitor/monitor.go b/auditbeat/module/file_integrity/monitor/monitor.go index 560527e240e..40b77fa906b 100644 --- a/auditbeat/module/file_integrity/monitor/monitor.go +++ b/auditbeat/module/file_integrity/monitor/monitor.go @@ -21,7 +21,9 @@ func New(recursive bool) (Watcher, error) { if err != nil { return nil, err } - if recursive { + // Use our simulated recursive watches unless the fsnotify implementation + // supports OS-provided recursive watches + if recursive && fsnotify.SetRecursive() != nil { return newRecursiveWatcher(fsnotify), nil } return (*nonRecursiveWatcher)(fsnotify), nil diff --git a/auditbeat/tests/system/auditbeat.py b/auditbeat/tests/system/auditbeat.py index 92011242d5e..1c0f4e816b8 100644 --- a/auditbeat/tests/system/auditbeat.py +++ b/auditbeat/tests/system/auditbeat.py @@ -1,8 +1,13 @@ import os +import shutil import sys +import tempfile sys.path.append(os.path.join(os.path.dirname(__file__), '../../../metricbeat/tests/system')) +if os.name == "nt": + import win32file + from metricbeat import BaseTest as MetricbeatTest @@ -13,3 +18,37 @@ def setUpClass(self): self.beat_path = os.path.abspath( os.path.join(os.path.dirname(__file__), "../../")) super(MetricbeatTest, self).setUpClass() + + def create_file(self, path, contents): + f = open(path, 'wb') + f.write(contents) + f.close() + + def check_event(self, event, expected): + for key in expected: + assert key in event, "key '{0}' not found in event".format(key) + assert event[key] == expected[key], \ + "key '{0}' has value '{1}', expected '{2}'".format(key, + event[key], + expected[key]) + + def temp_dir(self, prefix): + # os.path.realpath resolves any symlinks in path. Necessary for macOS + # where /var is a symlink to /private/var + p = os.path.realpath(tempfile.mkdtemp(prefix)) + if os.name == "nt": + # Under windows, get rid of any ~1 in path (short path) + p = str(win32file.GetLongPathName(p)) + return p + + +class PathCleanup: + def __init__(self, paths): + self.paths = paths + + def __enter__(self): + pass + + def __exit__(self, exc_type, exc_val, exc_tb): + for path in self.paths: + shutil.rmtree(path) diff --git a/auditbeat/tests/system/config/auditbeat.yml.j2 b/auditbeat/tests/system/config/auditbeat.yml.j2 index 3c4f9ac30ae..6068b99e730 100644 --- a/auditbeat/tests/system/config/auditbeat.yml.j2 +++ b/auditbeat/tests/system/config/auditbeat.yml.j2 @@ -18,8 +18,8 @@ auditbeat.modules: {%- endfor %} queue.mem: - events: 4096 - flush.min_events: 8 - flush.timeout: 0.1s + events: 4 + flush.min_events: 0 + flush.timeout: 0.01s {% include './tests/system/config/libbeat.yml.j2' %} diff --git a/auditbeat/tests/system/test_file_integrity.py b/auditbeat/tests/system/test_file_integrity.py new file mode 100644 index 00000000000..8688a5bbb2c --- /dev/null +++ b/auditbeat/tests/system/test_file_integrity.py @@ -0,0 +1,189 @@ +import sys +import os +import shutil +import time +import unittest +from auditbeat import * +from beat.beat import INTEGRATION_TESTS + + +# Escapes a path to match what's printed in the logs +def escape_path(path): + return path.replace('\\', '\\\\') + + +def has_file(objs, path, sha1hash): + found = False + for obj in objs: + if 'file.path' in obj and 'hash.sha1' in obj \ + and obj['file.path'].lower() == path.lower() and obj['hash.sha1'] == sha1hash: + found = True + break + assert found, "File '{0}' with sha1sum '{1}' not found".format(path, sha1hash) + + +def has_dir(objs, path): + found = False + for obj in objs: + if 'file.path' in obj and obj['file.path'].lower() == path.lower() and obj['file.type'] == "dir": + found = True + break + assert found, "Dir '{0}' not found".format(path) + + +def file_events(objs, path, expected): + evts = set() + for obj in objs: + if 'file.path' in obj and 'event.action' in obj and obj['file.path'].lower() == path.lower(): + if type(obj['event.action']) == list: + evts = evts.union(set(obj['event.action'])) + else: + evts.add(obj['event.action']) + for wanted in set(expected): + assert wanted in evts, "Event {0} for path '{1}' not found (got {2})".format( + wanted, path, evts) + + +def wrap_except(expr): + try: + return expr() + except IOError: + return False + + +class Test(BaseTest): + + def wait_output(self, min_events): + self.wait_until(lambda: wrap_except(lambda: len(self.read_output()) >= min_events)) + # wait for the number of lines in the file to stay constant for a second + prev_lines = -1 + while True: + num_lines = self.output_lines() + if prev_lines < num_lines: + prev_lines = num_lines + time.sleep(1) + else: + break + + def test_non_recursive(self): + """ + file_integrity monitors watched directories (non recursive). + """ + + dirs = [self.temp_dir("auditbeat_test"), + self.temp_dir("auditbeat_test")] + + with PathCleanup(dirs): + self.render_config_template( + modules=[{ + "name": "file_integrity", + "extras": { + "paths": dirs, + "scan_at_start": False + } + }], + ) + proc = self.start_beat() + + # wait until the directories to watch are printed in the logs + # this happens when the file_integrity module starts. + # Case must be ignored under windows as capitalisation of paths + # may differ + self.wait_log_contains(escape_path(dirs[0]), max_timeout=30, ignore_case=True) + + file1 = os.path.join(dirs[0], 'file.txt') + self.create_file(file1, "hello world!") + + file2 = os.path.join(dirs[1], 'file2.txt') + self.create_file(file2, "Foo bar") + + # wait until file1 is reported before deleting. Otherwise the hash + # might not be calculated + self.wait_log_contains("\"path\": \"{0}\"".format(escape_path(file1)), ignore_case=True) + + os.unlink(file1) + + subdir = os.path.join(dirs[0], "subdir") + os.mkdir(subdir) + file3 = os.path.join(subdir, "other_file.txt") + self.create_file(file3, "not reported.") + + self.wait_log_contains("\"deleted\"") + self.wait_log_contains("\"path\": \"{0}\"".format(escape_path(subdir)), ignore_case=True) + self.wait_output(3) + + proc.check_kill_and_wait() + self.assert_no_logged_warnings() + + # Ensure all Beater stages are used. + assert self.log_contains("Setup Beat: auditbeat") + assert self.log_contains("auditbeat start running") + assert self.log_contains("auditbeat stopped") + + objs = self.read_output() + + has_file(objs, file1, "430ce34d020724ed75a196dfc2ad67c77772d169") + has_file(objs, file2, "d23be250530a24be33069572db67995f21244c51") + has_dir(objs, subdir) + + file_events(objs, file1, ['created', 'deleted']) + file_events(objs, file2, ['created']) + + # assert file inside subdir is not reported + assert self.log_contains(file3) is False + + def test_recursive(self): + """ + file_integrity monitors watched directories (recursive). + """ + + dirs = [self.temp_dir("auditbeat_test")] + + with PathCleanup(dirs): + self.render_config_template( + modules=[{ + "name": "file_integrity", + "extras": { + "paths": dirs, + "scan_at_start": False, + "recursive": True + } + }], + ) + proc = self.start_beat() + + # wait until the directories to watch are printed in the logs + # this happens when the file_integrity module starts + self.wait_log_contains(escape_path(dirs[0]), max_timeout=30, ignore_case=True) + self.wait_log_contains("\"recursive\": true") + + subdir = os.path.join(dirs[0], "subdir") + os.mkdir(subdir) + file1 = os.path.join(subdir, "file.txt") + self.create_file(file1, "hello world!") + + subdir2 = os.path.join(subdir, "other") + os.mkdir(subdir2) + file2 = os.path.join(subdir2, "more.txt") + self.create_file(file2, "") + + self.wait_log_contains("\"path\": \"{0}\"".format(escape_path(file2)), ignore_case=True) + self.wait_output(4) + + proc.check_kill_and_wait() + self.assert_no_logged_warnings() + + # Ensure all Beater stages are used. + assert self.log_contains("Setup Beat: auditbeat") + assert self.log_contains("auditbeat start running") + assert self.log_contains("auditbeat stopped") + + objs = self.read_output() + + has_file(objs, file1, "430ce34d020724ed75a196dfc2ad67c77772d169") + has_file(objs, file2, "da39a3ee5e6b4b0d3255bfef95601890afd80709") + has_dir(objs, subdir) + has_dir(objs, subdir2) + + file_events(objs, file1, ['created']) + file_events(objs, file2, ['created']) diff --git a/libbeat/tests/system/beat/beat.py b/libbeat/tests/system/beat/beat.py index 552c6cac2a0..fe0ad480509 100644 --- a/libbeat/tests/system/beat/beat.py +++ b/libbeat/tests/system/beat/beat.py @@ -339,27 +339,30 @@ def get_log(self, logfile=None): def wait_log_contains(self, msg, logfile=None, max_timeout=10, poll_interval=0.1, - name="log_contains"): + name="log_contains", + ignore_case=False): self.wait_until( - cond=lambda: self.log_contains(msg, logfile), + cond=lambda: self.log_contains(msg, logfile, ignore_case=ignore_case), max_timeout=max_timeout, poll_interval=poll_interval, name=name) - def log_contains(self, msg, logfile=None): + def log_contains(self, msg, logfile=None, ignore_case=False): """ Returns true if the give logfile contains the given message. Note that the msg must be present in a single line. """ - return self.log_contains_count(msg, logfile) > 0 + return self.log_contains_count(msg, logfile, ignore_case=ignore_case) > 0 - def log_contains_count(self, msg, logfile=None): + def log_contains_count(self, msg, logfile=None, ignore_case=False): """ Returns the number of appearances of the given string in the log file """ counter = 0 + if ignore_case: + msg = msg.lower() # Init defaults if logfile is None: @@ -368,6 +371,8 @@ def log_contains_count(self, msg, logfile=None): try: with open(os.path.join(self.working_dir, logfile), "r") as f: for line in f: + if ignore_case: + line = line.lower() if line.find(msg) >= 0: counter = counter + 1 except IOError: diff --git a/vendor/github.com/fsnotify/fsevents/example/main.go b/vendor/github.com/fsnotify/fsevents/example/main.go new file mode 100644 index 00000000000..685880974fb --- /dev/null +++ b/vendor/github.com/fsnotify/fsevents/example/main.go @@ -0,0 +1,101 @@ +// +build darwin + +package main + +import ( + "bufio" + "io/ioutil" + "log" + "os" + "runtime" + "time" + + "github.com/fsnotify/fsevents" +) + +func main() { + path, err := ioutil.TempDir("", "fsexample") + if err != nil { + log.Fatalf("Failed to create TempDir: %v", err) + } + dev, err := fsevents.DeviceForPath(path) + if err != nil { + log.Fatalf("Failed to retrieve device for path: %v", err) + } + log.Print(dev) + log.Println(fsevents.EventIDForDeviceBeforeTime(dev, time.Now())) + + es := &fsevents.EventStream{ + Paths: []string{path}, + Latency: 500 * time.Millisecond, + Device: dev, + Flags: fsevents.FileEvents | fsevents.WatchRoot} + es.Start() + ec := es.Events + + log.Println("Device UUID", fsevents.GetDeviceUUID(dev)) + + go func() { + for msg := range ec { + for _, event := range msg { + logEvent(event) + } + } + }() + + in := bufio.NewReader(os.Stdin) + + if false { + log.Print("Started, press enter to GC") + in.ReadString('\n') + runtime.GC() + log.Print("GC'd, press enter to quit") + in.ReadString('\n') + } else { + log.Print("Started, press enter to stop") + in.ReadString('\n') + es.Stop() + + log.Print("Stopped, press enter to restart") + in.ReadString('\n') + es.Resume = true + es.Start() + + log.Print("Restarted, press enter to quit") + in.ReadString('\n') + es.Stop() + } +} + +var noteDescription = map[fsevents.EventFlags]string{ + fsevents.MustScanSubDirs: "MustScanSubdirs", + fsevents.UserDropped: "UserDropped", + fsevents.KernelDropped: "KernelDropped", + fsevents.EventIDsWrapped: "EventIDsWrapped", + fsevents.HistoryDone: "HistoryDone", + fsevents.RootChanged: "RootChanged", + fsevents.Mount: "Mount", + fsevents.Unmount: "Unmount", + + fsevents.ItemCreated: "Created", + fsevents.ItemRemoved: "Removed", + fsevents.ItemInodeMetaMod: "InodeMetaMod", + fsevents.ItemRenamed: "Renamed", + fsevents.ItemModified: "Modified", + fsevents.ItemFinderInfoMod: "FinderInfoMod", + fsevents.ItemChangeOwner: "ChangeOwner", + fsevents.ItemXattrMod: "XAttrMod", + fsevents.ItemIsFile: "IsFile", + fsevents.ItemIsDir: "IsDir", + fsevents.ItemIsSymlink: "IsSymLink", +} + +func logEvent(event fsevents.Event) { + note := "" + for bit, description := range noteDescription { + if event.Flags&bit == bit { + note += description + " " + } + } + log.Printf("EventID: %d Path: %s Flags: %s", event.ID, event.Path, note) +} diff --git a/vendor/github.com/fsnotify/fsnotify/AUTHORS b/vendor/github.com/fsnotify/fsnotify/AUTHORS index 0a5bf8f617a..5ab5d41c547 100644 --- a/vendor/github.com/fsnotify/fsnotify/AUTHORS +++ b/vendor/github.com/fsnotify/fsnotify/AUTHORS @@ -8,8 +8,10 @@ # Please keep the list sorted. +Aaron L Adrien Bustany Amit Krishnan +Anmol Sethi Bjørn Erik Pedersen Bruno Bigras Caleb Spare @@ -26,6 +28,7 @@ Kelvin Fo Ken-ichirou MATSUZAWA Matt Layher Nathan Youngman +Nickolai Zeldovich Patrick Paul Hammond Pawel Knap @@ -33,12 +36,15 @@ Pieter Droogendijk Pursuit92 Riku Voipio Rob Figueiredo +Rodrigo Chiossi Slawek Ligus Soge Zhang Tiffany Jernigan Tilak Sharma +Tom Payne Travis Cline Tudor Golubenco +Vahe Khachikyan Yukang bronze1man debrando diff --git a/vendor/github.com/fsnotify/fsnotify/CHANGELOG.md b/vendor/github.com/fsnotify/fsnotify/CHANGELOG.md index 8c732c1d85c..be4d7ea2c14 100644 --- a/vendor/github.com/fsnotify/fsnotify/CHANGELOG.md +++ b/vendor/github.com/fsnotify/fsnotify/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## v1.4.7 / 2018-01-09 + +* BSD/macOS: Fix possible deadlock on closing the watcher on kqueue (thanks @nhooyr and @glycerine) +* Tests: Fix missing verb on format string (thanks @rchiossi) +* Linux: Fix deadlock in Remove (thanks @aarondl) +* Linux: Watch.Add improvements (avoid race, fix consistency, reduce garbage) (thanks @twpayne) +* Docs: Moved FAQ into the README (thanks @vahe) +* Linux: Properly handle inotify's IN_Q_OVERFLOW event (thanks @zeldovich) +* Docs: replace references to OS X with macOS + ## v1.4.2 / 2016-10-10 * Linux: use InotifyInit1 with IN_CLOEXEC to stop leaking a file descriptor to a child process when using fork/exec [#178](https://github.com/fsnotify/fsnotify/pull/178) (thanks @pattyshack) diff --git a/vendor/github.com/fsnotify/fsnotify/fen.go b/vendor/github.com/fsnotify/fsnotify/fen.go index ced39cb881e..b5a41c8919b 100644 --- a/vendor/github.com/fsnotify/fsnotify/fen.go +++ b/vendor/github.com/fsnotify/fsnotify/fen.go @@ -35,3 +35,9 @@ func (w *Watcher) Add(name string) error { func (w *Watcher) Remove(name string) error { return nil } + +// SetRecursive enables watches to also monitor subdirectories. Currently +// only supported under Windows. +func (w *Watcher) SetRecursive() error { + return nil +} diff --git a/vendor/github.com/fsnotify/fsnotify/inotify.go b/vendor/github.com/fsnotify/fsnotify/inotify.go index d9fd1b88a05..98cb3739340 100644 --- a/vendor/github.com/fsnotify/fsnotify/inotify.go +++ b/vendor/github.com/fsnotify/fsnotify/inotify.go @@ -162,6 +162,12 @@ func (w *Watcher) Remove(name string) error { return nil } +// SetRecursive enables watches to also monitor subdirectories. Currently +// only supported under Windows. +func (w *Watcher) SetRecursive() error { + return errors.New("recursion not supported") +} + type watch struct { wd uint32 // Watch descriptor (as returned by the inotify_add_watch() syscall) flags uint32 // inotify flags of this watch (see inotify(7) for the list of valid flags) diff --git a/vendor/github.com/fsnotify/fsnotify/kqueue.go b/vendor/github.com/fsnotify/fsnotify/kqueue.go index c2b4acb18dd..fc0975dfcb6 100644 --- a/vendor/github.com/fsnotify/fsnotify/kqueue.go +++ b/vendor/github.com/fsnotify/fsnotify/kqueue.go @@ -22,7 +22,7 @@ import ( type Watcher struct { Events chan Event Errors chan error - done chan bool // Channel for sending a "quit message" to the reader goroutine + done chan struct{} // Channel for sending a "quit message" to the reader goroutine kq int // File descriptor (as returned by the kqueue() syscall). @@ -56,7 +56,7 @@ func NewWatcher() (*Watcher, error) { externalWatches: make(map[string]bool), Events: make(chan Event), Errors: make(chan error), - done: make(chan bool), + done: make(chan struct{}), } go w.readEvents() @@ -71,10 +71,8 @@ func (w *Watcher) Close() error { return nil } w.isClosed = true - w.mu.Unlock() // copy paths to remove while locked - w.mu.Lock() var pathsToRemove = make([]string, 0, len(w.watches)) for name := range w.watches { pathsToRemove = append(pathsToRemove, name) @@ -82,15 +80,12 @@ func (w *Watcher) Close() error { w.mu.Unlock() // unlock before calling Remove, which also locks - var err error for _, name := range pathsToRemove { - if e := w.Remove(name); e != nil && err == nil { - err = e - } + w.Remove(name) } - // Send "quit" message to the reader goroutine: - w.done <- true + // send a "quit" message to the reader goroutine + close(w.done) return nil } @@ -152,6 +147,12 @@ func (w *Watcher) Remove(name string) error { return nil } +// SetRecursive enables watches to also monitor subdirectories. Currently +// only supported under Windows. +func (w *Watcher) SetRecursive() error { + return errors.New("recursion not supported") +} + // Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE) const noteAllEvents = unix.NOTE_DELETE | unix.NOTE_WRITE | unix.NOTE_ATTRIB | unix.NOTE_RENAME @@ -266,17 +267,12 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) { func (w *Watcher) readEvents() { eventBuffer := make([]unix.Kevent_t, 10) +loop: for { // See if there is a message on the "done" channel select { case <-w.done: - err := unix.Close(w.kq) - if err != nil { - w.Errors <- err - } - close(w.Events) - close(w.Errors) - return + break loop default: } @@ -284,7 +280,11 @@ func (w *Watcher) readEvents() { kevents, err := read(w.kq, eventBuffer, &keventWaitTime) // EINTR is okay, the syscall was interrupted before timeout expired. if err != nil && err != unix.EINTR { - w.Errors <- err + select { + case w.Errors <- err: + case <-w.done: + break loop + } continue } @@ -319,8 +319,12 @@ func (w *Watcher) readEvents() { if path.isDir && event.Op&Write == Write && !(event.Op&Remove == Remove) { w.sendDirectoryChangeEvents(event.Name) } else { - // Send the event on the Events channel - w.Events <- event + // Send the event on the Events channel. + select { + case w.Events <- event: + case <-w.done: + break loop + } } if event.Op&Remove == Remove { @@ -352,6 +356,18 @@ func (w *Watcher) readEvents() { kevents = kevents[1:] } } + + // cleanup + err := unix.Close(w.kq) + if err != nil { + // only way the previous loop breaks is if w.done was closed so we need to async send to w.Errors. + select { + case w.Errors <- err: + default: + } + } + close(w.Events) + close(w.Errors) } // newEvent returns an platform-independent Event based on kqueue Fflags. @@ -407,7 +423,11 @@ func (w *Watcher) sendDirectoryChangeEvents(dirPath string) { // Get all files files, err := ioutil.ReadDir(dirPath) if err != nil { - w.Errors <- err + select { + case w.Errors <- err: + case <-w.done: + return + } } // Search for new files @@ -428,7 +448,11 @@ func (w *Watcher) sendFileCreatedEventIfNew(filePath string, fileInfo os.FileInf w.mu.Unlock() if !doesExist { // Send create event - w.Events <- newCreateEvent(filePath) + select { + case w.Events <- newCreateEvent(filePath): + case <-w.done: + return + } } // like watchDirectoryFiles (but without doing another ReadDir) diff --git a/vendor/github.com/fsnotify/fsnotify/windows.go b/vendor/github.com/fsnotify/fsnotify/windows.go index 09436f31d82..a5a7ccbeb04 100644 --- a/vendor/github.com/fsnotify/fsnotify/windows.go +++ b/vendor/github.com/fsnotify/fsnotify/windows.go @@ -13,20 +13,22 @@ import ( "path/filepath" "runtime" "sync" + "sync/atomic" "syscall" "unsafe" ) // Watcher watches a set of files, delivering events to a channel. type Watcher struct { - Events chan Event - Errors chan error - isClosed bool // Set to true when Close() is first called - mu sync.Mutex // Map access - port syscall.Handle // Handle to completion port - watches watchMap // Map of watches (key: i-number) - input chan *input // Inputs to the reader are sent on this channel - quit chan chan<- error + Events chan Event + Errors chan error + isClosed bool // Set to true when Close() is first called + mu sync.Mutex // Map access + port syscall.Handle // Handle to completion port + watches watchMap // Map of watches (key: i-number) + input chan *input // Inputs to the reader are sent on this channel + quit chan chan<- error + watchSubTree uint32 } // NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. @@ -95,6 +97,13 @@ func (w *Watcher) Remove(name string) error { return <-in.reply } +// SetRecursive enables watches to also monitor subdirectories. Currently +// only supported under Windows. +func (w *Watcher) SetRecursive() error { + atomic.StoreUint32(&w.watchSubTree, 1) + return nil +} + const ( // Options for AddWatch sysFSONESHOT = 0x80000000 @@ -348,7 +357,8 @@ func (w *Watcher) startRead(watch *watch) error { return nil } e := syscall.ReadDirectoryChanges(watch.ino.handle, &watch.buf[0], - uint32(unsafe.Sizeof(watch.buf)), false, mask, nil, &watch.ov, 0) + uint32(unsafe.Sizeof(watch.buf)), atomic.LoadUint32(&w.watchSubTree) != 0, + mask, nil, &watch.ov, 0) if e != nil { err := os.NewSyscallError("ReadDirectoryChanges", e) if e == syscall.ERROR_ACCESS_DENIED && watch.mask&provisional == 0 { diff --git a/vendor/vendor.json b/vendor/vendor.json index db53166c140..70deea221da 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -963,10 +963,11 @@ "tree": true }, { - "checksumSHA1": "x2Km0Qy3WgJJnV19Zv25VwTJcBM=", + "checksumSHA1": "xGjYGUfsd36pm3CqdV/RYT87xxM=", + "origin": "github.com/adriansr/fsnotify", "path": "github.com/fsnotify/fsnotify", - "revision": "4da3e2cfbabc9f751898f250b49f2439785783a1", - "revisionTime": "2017-03-29T04:21:07Z" + "revision": "c9bbe1f46f1da9904baf3916a4ba4aec7f1e9000", + "revisionTime": "2018-04-17T23:40:13Z" }, { "checksumSHA1": "2UmMbNHc8FBr98mJFN1k8ISOIHk=",