Skip to content

Commit

Permalink
Move to inotify_simple from unsupported pyinotify
Browse files Browse the repository at this point in the history
  • Loading branch information
oldpatricka committed Jul 2, 2024
1 parent b8a67d2 commit 5ad2a34
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 36 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
python: [3.6, 3.8, 3.9]
extras: ["test", "test,queable,sentry"]
python: [3.6, 3.8, 3.9, "3.10"]
extras: ["test", "test,queuable,sentry"]
steps:
- name: Setup Python
uses: actions/setup-python@v2.2.2
Expand Down
54 changes: 21 additions & 33 deletions client/datalake/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,10 @@
InsufficientConfiguration.'''
has_queue = True
try:
import pyinotify
import inotify_simple
except ImportError:
has_queue = False

class FakePyinotify(object):

class ProcessEvent(object):
pass

pyinotify = FakePyinotify


def requires_queue(f):
def wrapped(*args, **kwargs):
Expand Down Expand Up @@ -124,34 +117,25 @@ def __init__(self, archive, queue_dir, callback=None):
self._archive = archive
self._callback = callback

class EventHandler(pyinotify.ProcessEvent):

def __init__(self, callback):
super(Uploader.EventHandler, self).__init__()
self.callback = callback

def process_IN_CLOSE_WRITE(self, event):
self.callback(event.pathname)

def process_IN_MOVED_TO(self, event):
self.callback(event.pathname)
self.inotify = inotify_simple.INotify()

def _setup_watch_manager(self, timeout):
if timeout is not None:
timeout = int(timeout * 1000)
self._wm = pyinotify.WatchManager()
self._handler = Uploader.EventHandler(self._push)
self._notifier = pyinotify.Notifier(self._wm, self._handler,
timeout=timeout)
self._wm.add_watch(self.queue_dir,
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO)
flags = inotify_simple.flags
watch_flags = flags.CLOSE_WRITE | flags.MOVED_TO
self.inotify.add_watch(self.queue_dir, watch_flags)

def _push(self, filename):
print(f"FILENAME: {filename}")
if not os.path.isabs(filename):
filename = os.path.join(self.queue_dir, filename)
print(f"FULL FILENAME: {filename}")
if os.path.basename(filename).startswith('.'):
return
if self._workers == []:
if not self._workers:
print(f"SYNC PUSH: {filename}")
self._synchronous_push(filename)
else:
print(f"THREAD PUSH: {filename}")
self._threaded_push(filename)

def _synchronous_push(self, filename):
Expand Down Expand Up @@ -205,7 +189,7 @@ def _listen(self, timeout=None, workers=1):
msg = 'number of upload workers cannot be zero or negative'
raise InsufficientConfiguration(msg)
if workers > 1:
# when multipe workers are requested, the main thread monitors the
# when multiple workers are requested, the main thread monitors the
# queue directory and puts the files in a Queue that is serviced by
# the worker threads. So the word queue is a bit overloaded in this
# module.
Expand All @@ -229,10 +213,14 @@ def _create_worker(self, worker_number):
def _run(self, timeout):

self._prepare_to_track_run_time(timeout)
self._notifier.process_events()
while self._notifier.check_events():
self._notifier.read_events()
self._notifier.process_events()
if timeout is not None:
timeout = int(timeout * 1000)

for event in self.inotify.read(timeout=timeout):
print(event)
if event.name is None:
continue
self._push(event.name)
if self._update_time_remaining() == 0:
break

Expand Down
2 changes: 1 addition & 1 deletion client/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def get_version():
# the queuable feature allows users to offload their datalake pushes
# to a separate uploader process.
'queuable': [
'pyinotify>=0.9.4',
'inotify_simple>=1.3.5',
],
'sentry': [
'raven>=5.0.0',
Expand Down

0 comments on commit 5ad2a34

Please sign in to comment.