Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mac] Fix callback exceptions when the watcher is deleted but still receiving events #786

Merged
merged 6 commits into from
May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ Changelog

2021-0x-xx • `full history <https://github.com/gorakhargosh/watchdog/compare/v2.1.0...master>`__

-
- Thanks to our beloved contributors:
- [mac] Fix callback exceptions when the watcher is deleted but still receiving events
- Thanks to our beloved contributors: @rom1win, @BoboTiG, @CCP-Aporia


2.1.0
Expand Down
41 changes: 22 additions & 19 deletions src/watchdog/observers/fsevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,8 @@ def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, suppress
self._lock = threading.Lock()

def on_thread_stop(self):
if self.watch:
_fsevents.remove_watch(self.watch)
_fsevents.stop(self)
self._watch = None
_fsevents.remove_watch(self.watch)
_fsevents.stop(self)

def queue_event(self, event):
# fsevents defaults to be recursive, so if the watch was meant to be non-recursive then we need to drop
Expand Down Expand Up @@ -287,23 +285,28 @@ def queue_events(self, timeout, events):

self._fs_view.clear()

def events_callback(self, paths, inodes, flags, ids):
"""Callback passed to FSEventStreamCreate(), it will receive all
FS events and queue them.
"""
cls = _fsevents.NativeEvent
try:
events = [
cls(path, inode, event_flags, event_id)
for path, inode, event_flags, event_id in zip(
paths, inodes, flags, ids
)
]
with self._lock:
self.queue_events(self.timeout, events)
except Exception:
logger.exception("Unhandled exception in fsevents callback")

def run(self):
self.pathnames = [self.watch.path]
self._start_time = time.monotonic()
try:
def callback(paths, inodes, flags, ids, emitter=self):
try:
with emitter._lock:
events = [
_fsevents.NativeEvent(path, inode, event_flags, event_id)
for path, inode, event_flags, event_id in zip(paths, inodes, flags, ids)
]
emitter.queue_events(emitter.timeout, events)
except Exception:
logger.exception("Unhandled exception in fsevents callback")

self.pathnames = [self.watch.path]
self._start_time = time.monotonic()

_fsevents.add_watch(self, self.watch, callback, self.pathnames)
_fsevents.add_watch(self, self.watch, self.events_callback, self.pathnames)
_fsevents.read_events(self)
except Exception:
logger.exception("Unhandled exception in FSEventsEmitter")
Expand Down
11 changes: 7 additions & 4 deletions src/watchdog_fsevents.c
Original file line number Diff line number Diff line change
Expand Up @@ -784,17 +784,20 @@ static PyObject *
watchdog_remove_watch(PyObject *self, PyObject *watch)
{
UNUSED(self);
PyObject *value = PyDict_GetItem(watch_to_stream, watch);
PyObject *streamref_capsule = PyDict_GetItem(watch_to_stream, watch);
if (!streamref_capsule) {
// A watch might have been removed explicitly before, in which case we can simply early out.
Py_RETURN_NONE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When exactly does this case occur?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it says in the comment - if someone explicitly calls remove_watch twice for the same watch then PyDict_GetItem won't find it again. Not checking for the nullptr return here was the main reason for the work-arounds in Python code. 🙂
Effectively, streamref_capsule being a nullptr caused the assertion failures for the three fsevents related API calls that follow, as well as the error message about returning a value with an exception set.

}
PyDict_DelItem(watch_to_stream, watch);

FSEventStreamRef stream_ref = PyCapsule_GetPointer(value, NULL);
FSEventStreamRef stream_ref = PyCapsule_GetPointer(streamref_capsule, NULL);

FSEventStreamStop(stream_ref);
FSEventStreamInvalidate(stream_ref);
FSEventStreamRelease(stream_ref);

Py_INCREF(Py_None);
return Py_None;
Py_RETURN_NONE;
}

PyDoc_STRVAR(watchdog_stop__doc__,
Expand Down
88 changes: 85 additions & 3 deletions tests/test_fsevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from functools import partial
from os import mkdir, rmdir
from queue import Queue
from random import random
from threading import Thread
from time import sleep
from unittest.mock import patch

import _watchdog_fsevents as _fsevents
from watchdog.events import FileSystemEventHandler
Expand Down Expand Up @@ -100,6 +104,86 @@ def callback(path, inodes, flags, ids):
rmdir(a)


def test_watcher_deletion_while_receiving_events_1(caplog, observer):
"""
When the watcher is stopped while there are events, such exception could happen:

Traceback (most recent call last):
File "observers/fsevents.py", line 327, in events_callback
self.queue_events(self.timeout, events)
File "observers/fsevents.py", line 187, in queue_events
src_path = self._encode_path(event.path)
File "observers/fsevents.py", line 352, in _encode_path
if isinstance(self.watch.path, bytes):
AttributeError: 'NoneType' object has no attribute 'path'
"""
tmpdir = p()

orig = FSEventsEmitter.events_callback

def cb(*args):
FSEventsEmitter.stop(emitter)
orig(*args)

with caplog.at_level(logging.ERROR), patch.object(FSEventsEmitter, "events_callback", new=cb):
start_watching(tmpdir)
# Less than 100 is not enough events to trigger the error
for n in range(100):
touch(p("{}.txt".format(n)))
emitter.stop()
assert not caplog.records


def test_watcher_deletion_while_receiving_events_2(caplog):
"""Note: that test takes about 20 seconds to complete.

Quite similar test to prevent another issue
when the watcher is stopped while there are events, such exception could happen:

Traceback (most recent call last):
File "observers/fsevents.py", line 327, in events_callback
self.queue_events(self.timeout, events)
File "observers/fsevents.py", line 235, in queue_events
self._queue_created_event(event, src_path, src_dirname)
File "observers/fsevents.py", line 132, in _queue_created_event
self.queue_event(cls(src_path))
File "observers/fsevents.py", line 104, in queue_event
if self._watch.is_recursive:
AttributeError: 'NoneType' object has no attribute 'is_recursive'
"""

def try_to_fail():
tmpdir = p()
start_watching(tmpdir)

def create_files():
# Less than 2000 is not enough events to trigger the error
for n in range(2000):
touch(p(str(n) + ".txt"))

def stop(em):
sleep(random())
em.stop()

th1 = Thread(target=create_files)
th2 = Thread(target=stop, args=(emitter,))

try:
with caplog.at_level(logging.ERROR):
th1.start()
th2.start()
th1.join()
th2.join()
assert not caplog.records
finally:
emitter.stop()

# 20 attempts to make the random failure happen
for _ in range(20):
try_to_fail()
sleep(random())


def test_remove_watch_twice():
"""
ValueError: PyCapsule_GetPointer called with invalid PyCapsule object
Expand Down Expand Up @@ -143,9 +227,7 @@ def on_thread_stop(self):
w = observer.schedule(FileSystemEventHandler(), a, recursive=False)
rmdir(a)
time.sleep(0.1)
with pytest.raises(KeyError):
# watch no longer exists!
observer.unschedule(w)
observer.unschedule(w)


def test_converting_cfstring_to_pyunicode():
Expand Down