Skip to content

Commit

Permalink
Migrated notifier to AsyncNotifer instead of ThreadedNotifier - more …
Browse files Browse the repository at this point in the history
…perfomance, no thread used per active watcher
  • Loading branch information
Panos Kittenis committed Nov 25, 2013
1 parent 8f89608 commit 397b979
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
24 changes: 17 additions & 7 deletions cronify/cronify.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import datetime
import re
import signal
import asyncore
import threading
from common import read_cfg, CFG_FILE

_MASKS = pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_FROM
Expand Down Expand Up @@ -228,6 +230,12 @@ def __init__(self, watch_data, callback_func = None):
self.thread_pool = threadpool.ThreadPool(num_workers = 10)
self.start_watchers(self.watch_data)
signal.signal(signal.SIGUSR1, self.reload_signal_handler)
self.asyncore_thread = None

def _asyncore_target_thread(self):
"""Target function for per watcher asyncore loop thread"""
while True:
asyncore.loop()

def reload_signal_handler(self, signalnum, frame):
"""Signal handler for reloading configuration file and watchers"""
Expand Down Expand Up @@ -292,20 +300,21 @@ def start_watchers(self, watch_data):
recurse = watch_data[watcher]['recurse'] if 'recurse' in watch_data[watcher] else False
watch_manager = pyinotify.WatchManager()
local_tz = watch_data[watcher]['local_tz'] if 'local_tz' in watch_data[watcher] else None
notifier = pyinotify.ThreadedNotifier(watch_manager, EventHandler(watch_data[watcher]['filemasks'].copy(),
self.thread_pool,
callback_func = self.callback_func,
local_tz = local_tz
))
notifier.daemon = True
notifier.start()
notifier = pyinotify.AsyncNotifier(watch_manager, EventHandler(watch_data[watcher]['filemasks'].copy(),
self.thread_pool,
callback_func = self.callback_func,
local_tz = local_tz
))
watch_manager.add_watch(watch_dir, _MASKS, rec = recurse, auto_add = True)
logger.info("Started watching directory %s with filemasks and actions %s, recurse %s..",
watch_dir,
watch_data[watcher]['filemasks'],
recurse,)
self.notifiers.append(notifier)
self.watch_managers.append(watch_manager)
self.asyncore_thread = threading.Thread(target = self._asyncore_target_thread)
self.asyncore_thread.daemon = True
self.asyncore_thread.start()

def update_watchers(self, watch_data = None):
"""Try and update watchers with new watch_data from cfg file"""
Expand Down Expand Up @@ -355,6 +364,7 @@ def cleanup(self):
[wm.rm_watch(wm.watches.keys()) for wm in self.watch_managers]
[notifier.stop() for notifier in self.notifiers]
self.watch_managers, self.notifiers = [], []
del self.asyncore_thread

def _callback_func(event):
"""Test function for callback_func optional parameter of Watcher class"""
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cronify.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def test_reload(self):
msg = "Expected watcher watch data to be the new data we reloaded")
self._make_test_file(new_test_filemask)
try:
self.assertEqual(new_test_filemask, self.q.get(timeout = 30),
self.assertEqual(new_test_filemask, self.q.get(timeout = 1),
msg = "Expected action to be triggered for new filemask %s" % (new_test_filemask,))
finally:
watcher.cleanup()
Expand Down

0 comments on commit 397b979

Please sign in to comment.