Skip to content

Commit

Permalink
Improve cleaning up Inotify threads and add eventlet test cases.
Browse files Browse the repository at this point in the history
  • Loading branch information
ethan-vanderheijden committed Sep 17, 2024
1 parent 4e9a86d commit 7bca6d9
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 23 deletions.
42 changes: 36 additions & 6 deletions src/watchdog/observers/inotify_c.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import ctypes.util
import errno
import os
import select
import struct
import threading
from ctypes import c_char_p, c_int, c_uint32
Expand Down Expand Up @@ -148,6 +149,9 @@ def __init__(self, path: bytes, *, recursive: bool = False, event_mask: int | No
Inotify._raise_error()
self._inotify_fd = inotify_fd
self._lock = threading.Lock()
self._closed = False
self._waiting_to_read = True
self._kill_r, self._kill_w = os.pipe()

# Stores the watch descriptor for a given path.
self._wd_for_path: dict[bytes, int] = {}
Expand Down Expand Up @@ -230,13 +234,19 @@ def remove_watch(self, path: bytes) -> None:
def close(self) -> None:
"""Closes the inotify instance and removes all associated watches."""
with self._lock:
if self._path in self._wd_for_path:
wd = self._wd_for_path[self._path]
inotify_rm_watch(self._inotify_fd, wd)
if not self._closed:
self._closed = True

# descriptor may be invalid because file was deleted
with contextlib.suppress(OSError):
os.close(self._inotify_fd)
if self._path in self._wd_for_path:
wd = self._wd_for_path[self._path]
inotify_rm_watch(self._inotify_fd, wd)

if self._waiting_to_read:
# inotify_rm_watch() should write data to _inotify_fd and wake
# the thread, but writing to the kill channel will gaurentee this
os.write(self._kill_w, b'!')
else:
self._close_resources()

def read_events(self, *, event_buffer_size: int = DEFAULT_EVENT_BUFFER_SIZE) -> list[InotifyEvent]:
"""Reads events from inotify and yields them."""
Expand Down Expand Up @@ -276,6 +286,21 @@ def _recursive_simulate(src_path: bytes) -> list[InotifyEvent]:
event_buffer = None
while True:
try:
with self._lock:
if self._closed:
return []

self._waiting_to_read = True

select.select([self._inotify_fd, self._kill_r], [], [])

with self._lock:
self._waiting_to_read = False

if self._closed:
self._close_resources()
return []

event_buffer = os.read(self._inotify_fd, event_buffer_size)
except OSError as e:
if e.errno == errno.EINTR:
Expand Down Expand Up @@ -340,6 +365,11 @@ def _recursive_simulate(src_path: bytes) -> list[InotifyEvent]:

return event_list

def _close_resources(self):
os.close(self._inotify_fd)
os.close(self._kill_r)
os.close(self._kill_w)

# Non-synchronized methods.
def _add_dir_watch(self, path: bytes, mask: int, *, recursive: bool) -> None:
"""Adds a watch (optionally recursively) for the given directory path
Expand Down
30 changes: 30 additions & 0 deletions tests/isolated/eventlet_observer_stops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
if __name__ == '__main__':
import eventlet

eventlet.monkey_patch()

import signal
import sys
import tempfile

from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler

with tempfile.TemporaryDirectory() as temp_dir:
def run_observer():
event_handler = LoggingEventHandler()
observer = Observer()
observer.schedule(event_handler, temp_dir)
observer.start()
eventlet.sleep(1)
observer.stop()

def on_alarm(signum, frame):
print("Observer.stop() never finished!", file=sys.stderr)
sys.exit(1)

signal.signal(signal.SIGALRM, on_alarm)
signal.alarm(4)

thread = eventlet.spawn(run_observer)
thread.wait()
23 changes: 23 additions & 0 deletions tests/isolated/eventlet_skip_repeat_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
if __name__ == '__main__':
import eventlet

eventlet.monkey_patch()

from watchdog.utils.bricks import SkipRepeatsQueue

# same as test_basic_queue() inside test_skip_repeats_queue.py

q = SkipRepeatsQueue()

e1 = (2, "fred")
e2 = (2, "george")
e3 = (4, "sally")

q.put(e1)
q.put(e2)
q.put(e3)

assert e1 == q.get()
assert e2 == q.get()
assert e3 == q.get()
assert q.empty()
11 changes: 10 additions & 1 deletion tests/test_inotify_c.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import errno
import logging
import os
import select
import struct
from typing import TYPE_CHECKING
from unittest.mock import patch
Expand Down Expand Up @@ -56,6 +57,13 @@ def test_late_double_deletion(helper: Helper, p: P, event_queue: TestEventQueue,
+ struct_inotify(wd=3, mask=const.IN_IGNORED)
)

select_bkp = select.select

def fakeselect(read_list, *args, **kwargs):
if inotify_fd in read_list:
return [inotify_fd], [], []
return select_bkp(read_list, *args, **kwargs)

os_read_bkp = os.read

def fakeread(fd, length):
Expand Down Expand Up @@ -92,8 +100,9 @@ def inotify_rm_watch(fd, wd):
mock3 = patch.object(inotify_c, "inotify_init", new=inotify_init)
mock4 = patch.object(inotify_c, "inotify_add_watch", new=inotify_add_watch)
mock5 = patch.object(inotify_c, "inotify_rm_watch", new=inotify_rm_watch)
mock6 = patch.object(select, "select", new=fakeselect)

with mock1, mock2, mock3, mock4, mock5:
with mock1, mock2, mock3, mock4, mock5, mock6:
start_watching(path=p(""))
# Watchdog Events
for evt_cls in [DirCreatedEvent, DirDeletedEvent] * 2:
Expand Down
21 changes: 21 additions & 0 deletions tests/test_isolated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import pytest
import importlib

from .markers import cpython_only
from .utils import run_isolated_test


@cpython_only
def test_observer_stops_in_eventlet():
if not importlib.util.find_spec('eventlet'):
pytest.skip("eventlet not installed")

run_isolated_test('eventlet_observer_stops.py')


@cpython_only
def test_eventlet_skip_repeat_queue():
if not importlib.util.find_spec('eventlet'):
pytest.skip("eventlet not installed")

run_isolated_test('eventlet_skip_repeat_queue.py')
17 changes: 1 addition & 16 deletions tests/test_skip_repeats_queue.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
from __future__ import annotations

import pytest

from watchdog import events
from watchdog.utils.bricks import SkipRepeatsQueue

from .markers import cpython_only


def basic_actions():
def test_basic_queue():
q = SkipRepeatsQueue()

e1 = (2, "fred")
Expand All @@ -25,10 +21,6 @@ def basic_actions():
assert q.empty()


def test_basic_queue():
basic_actions()


def test_allow_nonconsecutive():
q = SkipRepeatsQueue()

Expand Down Expand Up @@ -86,10 +78,3 @@ def test_consecutives_allowed_across_empties():
q.put(e1) # this repeat is allowed because 'last' added is now gone from queue
assert e1 == q.get()
assert q.empty()


@cpython_only
def test_eventlet_monkey_patching():
eventlet = pytest.importorskip("eventlet")
eventlet.monkey_patch()
basic_actions()
28 changes: 28 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import dataclasses
import os
import subprocess
import sys
from queue import Queue
from typing import Protocol

Expand Down Expand Up @@ -97,3 +99,29 @@ def close(self) -> None:
alive = [emitter.is_alive() for emitter in self.emitters]
self.emitters = []
assert alive == [False] * len(alive)


def run_isolated_test(path):
ISOALTED_TEST_PREFIX = os.path.join('tests', 'isolated')
path = os.path.abspath(os.path.join(ISOALTED_TEST_PREFIX, path))

src_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'src')
new_env = os.environ.copy()
new_env['PYTHONPATH'] = os.pathsep.join(sys.path + [src_dir])

new_argv = [sys.executable, path]

p = subprocess.Popen(
new_argv,
env=new_env,
)

# in case test goes haywire, don't let it run forever
timeout = 10
try:
p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
p.kill()
assert False, 'timed out'

assert p.returncode == 0

0 comments on commit 7bca6d9

Please sign in to comment.