Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve extensibility #783

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions huey/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,15 @@ def add_numbers(a, b):
def nightly_report():
generate_nightly_report()
"""
consumer_class = Consumer
storage_class = None
_deprecated_params = ('result_store', 'events', 'store_errors',
'global_registry')

@property
def task_wrapper_class(self):
return TaskWrapper

def __init__(self, name='huey', results=True, store_none=False, utc=True,
immediate=False, serializer=None, compression=False,
use_zlib=False, immediate_use_memory=True, always_eager=None,
Expand Down Expand Up @@ -162,12 +167,12 @@ def immediate(self, value):
self.storage = self.create_storage()

def create_consumer(self, **options):
return Consumer(self, **options)
return self.consumer_class(self, **options)

def task(self, retries=0, retry_delay=0, priority=None, context=False,
name=None, expires=None, **kwargs):
def decorator(func):
return TaskWrapper(
return self.task_wrapper_class(
self,
func.func if isinstance(func, TaskWrapper) else func,
context=context,
Expand All @@ -186,7 +191,7 @@ def decorator(func):
def method_validate(self, timestamp):
return validate_datetime(timestamp)

return TaskWrapper(
return self.task_wrapper_class(
self,
func.func if isinstance(func, TaskWrapper) else func,
context=context,
Expand Down
51 changes: 31 additions & 20 deletions huey/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from huey.constants import WORKER_PROCESS
from huey.constants import WORKER_THREAD
from huey.constants import WORKER_TYPES
from huey.exceptions import ConfigurationError
from huey.exceptions import ConfigurationError, StopFlagException
from huey.utils import time_clock


Expand Down Expand Up @@ -426,36 +426,47 @@ def stop(self, graceful=False):
else:
self._logger.info('Shutting down')

def run_one_iteration(self, health_check_ts):
"""
Run one iteration of the consumer event loop.
"""
timeout = self._stop_flag_timeout
try:
self.stop_flag.wait(timeout=timeout)
except KeyboardInterrupt:
self._logger.info('Received SIGINT')
self.stop(graceful=True)
except:
self._logger.exception('Error in consumer.')
self.stop()
else:
if self._received_signal:
self.stop(graceful=self._graceful)

if self.stop_flag.is_set():
raise StopFlagException

if self._health_check:
now = time_clock()
if now >= health_check_ts + self._health_check_interval:
health_check_ts = now
self.check_worker_health()

return health_check_ts

def run(self):
"""
Run the consumer.
"""
self.start()
timeout = self._stop_flag_timeout
health_check_ts = time_clock()

while True:
try:
self.stop_flag.wait(timeout=timeout)
except KeyboardInterrupt:
self._logger.info('Received SIGINT')
self.stop(graceful=True)
except:
self._logger.exception('Error in consumer.')
self.stop()
else:
if self._received_signal:
self.stop(graceful=self._graceful)

if self.stop_flag.is_set():
health_check_ts = self.run_one_iteration(health_check_ts)
except StopFlagException:
break

if self._health_check:
now = time_clock()
if now >= health_check_ts + self._health_check_interval:
health_check_ts = now
self.check_worker_health()

self.huey.notify_interrupted_tasks()

if self._restart:
Expand Down
1 change: 1 addition & 0 deletions huey/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
class HueyException(Exception): pass
class ConfigurationError(HueyException): pass
class TaskLockedException(HueyException): pass
class StopFlagException(HueyException): pass

class CancelExecution(Exception):
def __init__(self, retry=None, *args, **kwargs):
Expand Down