diff --git a/README.rst b/README.rst index fac12d2d..1e131337 100644 --- a/README.rst +++ b/README.rst @@ -22,7 +22,7 @@ Features TaskTiger forks a subprocess for each task, This comes with several benefits: Memory leaks caused by tasks are avoided since the subprocess is terminated - when the task is finished. A hard time limit can be set for each task, after + when the task is finished. A hard time limit can be set for each task, after which the task is killed if it hasn't completed. To ensure performance, any necessary Python modules can be preloaded in the parent process. @@ -552,15 +552,16 @@ Each queue can have tasks in the following states: - ``scheduled``: Tasks that are scheduled for later execution. - ``error``: Tasks that failed with an error. -To get a list of all tasks for a given queue and state, use -``Task.tasks_from_queue``. The method gives you back a tuple containing the -total number of tasks in the queue (useful if the tasks are truncated) and a -list of tasks in the queue, latest first. Using the ``skip`` and ``limit`` -keyword arguments, you can fetch arbitrary slices of the queue. If you know the -task ID, you can fetch a given task using ``Task.from_id``. Both methods let -you load tracebacks from failed task executions using the ``load_executions`` -keyword argument, which accepts an integer indicating how many executions -should be loaded. +To get a count of the number of tasks for a given queue and state, use +``Task.task_count_from_queue``. To get a list of all tasks for a given queue +and state, use ``Task.tasks_from_queue``. The method gives you back a tuple +containing the total number of tasks in the queue (useful if the tasks are +truncated) and a list of tasks in the queue, latest first. Using the ``skip`` +and ``limit`` keyword arguments, you can fetch arbitrary slices of the queue. +If you know the task ID, you can fetch a given task using ``Task.from_id``. +Both methods let you load tracebacks from failed task executions using the +``load_executions`` keyword argument, which accepts an integer indicating how +many executions should be loaded. Tasks can also be constructed and queued using the regular constructor, which takes the TaskTiger instance, the function name and the options described in @@ -661,6 +662,37 @@ executed. for task in tiger.current_tasks: print(task.n_executions()) +Example 4: Printing the number of queued tasks for the default queue. + +.. code:: python + + from tasktiger import TaskTiger, Task + + QUEUE_NAME = 'default' + TASK_STATE = 'queued' + + tiger = TaskTiger() + + count = Task.task_count_from_queue(tiger, QUEUE_NAME, TASK_STATE) + + print('{state} tasks in {queue}: {count}'.format( + state=TASK_STATE.title(), + queue=QUEUE_NAME, + count=count, + ) + +Example 5: Printing all queues with task metrics. + +.. code:: python + + from tasktiger import TaskTiger, Task + + tiger = TaskTiger() + + metrics = Task.queue_metrics(tiger) + + print(metrics) + Rollbar error handling ---------------------- diff --git a/tasktiger/_internal.py b/tasktiger/_internal.py index e0fc07ef..ae105cb5 100644 --- a/tasktiger/_internal.py +++ b/tasktiger/_internal.py @@ -36,6 +36,7 @@ 'current_tasks': None, } + # from rq def import_attribute(name): """Return an attribute from a dotted path name (e.g. "path.to.func").""" @@ -47,12 +48,14 @@ def import_attribute(name): except (ValueError, ImportError, AttributeError) as e: raise TaskImportError(e) + def gen_id(): """ Generates and returns a random hex-encoded 256-bit unique ID. """ return binascii.b2a_hex(os.urandom(32)).decode('utf8') + def gen_unique_id(serialized_name, args, kwargs): """ Generates and returns a hex-encoded 256-bit ID for the given task name and @@ -64,6 +67,7 @@ def gen_unique_id(serialized_name, args, kwargs): 'kwargs': kwargs, }, sort_keys=True).encode('utf8')).hexdigest() + def serialize_func_name(func): """ Returns the dotted serialized path to the passed function. @@ -78,18 +82,20 @@ def serialize_func_name(func): func_name = func.__name__ return ':'.join([func.__module__, func_name]) + def dotted_parts(s): """ For a string "a.b.c", yields "a", "a.b", "a.b.c". """ idx = -1 while s: - idx = s.find('.', idx+1) + idx = s.find('.', idx + 1) if idx == -1: yield s break yield s[:idx] + def reversed_dotted_parts(s): """ For a string "a.b.c", yields "a.b.c", "a.b", "a". @@ -103,12 +109,14 @@ def reversed_dotted_parts(s): break yield s[:idx] + def serialize_retry_method(retry_method): if callable(retry_method): return (serialize_func_name(retry_method), ()) else: return (serialize_func_name(retry_method[0]), retry_method[1]) + def get_timestamp(when): # convert timedelta to datetime if isinstance(when, datetime.timedelta): @@ -117,4 +125,4 @@ def get_timestamp(when): if when: # Convert to unixtime: utctimetuple drops microseconds so we add # them manually. - return calendar.timegm(when.utctimetuple()) + when.microsecond/1.e6 + return calendar.timegm(when.utctimetuple()) + when.microsecond / 1.e6 diff --git a/tasktiger/exceptions.py b/tasktiger/exceptions.py index b6911709..da0996cf 100644 --- a/tasktiger/exceptions.py +++ b/tasktiger/exceptions.py @@ -1,21 +1,25 @@ import sys + class TaskImportError(ImportError): """ Raised when a task could not be imported. """ + class JobTimeoutException(BaseException): """ Raised when a job takes longer to complete than the allowed maximum timeout value. """ + class StopRetry(Exception): """ Raised by a retry function to indicate that the task shouldn't be retried. """ + class RetryException(BaseException): """ Alternative to retry_on for retrying a task. If raised within a task, the @@ -32,6 +36,7 @@ def __init__(self, method=None, original_traceback=False, log_error=True): self.exc_info = sys.exc_info() if original_traceback else None self.log_error = log_error + class TaskNotFound(Exception): """ The task was not found or does not exist in the given queue/state. diff --git a/tasktiger/flask_script.py b/tasktiger/flask_script.py index e2ac63bf..116f8e6c 100644 --- a/tasktiger/flask_script.py +++ b/tasktiger/flask_script.py @@ -3,6 +3,7 @@ import argparse from flask_script import Command + class TaskTigerCommand(Command): capture_all_args = True help = 'Run a TaskTiger worker' @@ -14,10 +15,10 @@ def __init__(self, tiger): def create_parser(self, *args, **kwargs): # Override the default parser so we can pass all arguments to the # TaskTiger parser. - func_stack = kwargs.pop('func_stack',()) + func_stack = kwargs.pop('func_stack', ()) parent = kwargs.pop('parent', None) parser = argparse.ArgumentParser(*args, add_help=False, **kwargs) - parser.set_defaults(func_stack=func_stack+(self,)) + parser.set_defaults(func_stack=func_stack + (self,)) self.parser = parser self.parent = parent return parser diff --git a/tasktiger/redis_lock.py b/tasktiger/redis_lock.py index 929c52eb..8fdc2b22 100644 --- a/tasktiger/redis_lock.py +++ b/tasktiger/redis_lock.py @@ -1,5 +1,6 @@ -from redis.lock import LockError import time +from redis import WatchError +from redis.lock import Lock as RedisLock, LockError # TODO: Switch to Redlock (http://redis.io/topics/distlock) once the following # bugs are fixed: @@ -7,10 +8,10 @@ # * https://github.com/andymccurdy/redis-py/issues/629 # * https://github.com/andymccurdy/redis-py/issues/601 + # For now, we're using the old-style lock pattern (based on py-redis 2.8.0) # The class below additionally catches ValueError for better compatibility with # new-style locks (for when we upgrade), and adds a renew() method. - class Lock(object): """ A shared, distributed Lock. Using Redis for locking allows the Lock @@ -113,11 +114,10 @@ def renew(self, timeout=None): self.redis.getset(self.name, timeout_at) self.acquired_until = timeout_at + # For now unused: # New-style Lock with renew() method (andymccurdy/redis-py#629) # XXX: when upgrading to the new-style class, take old-style locks into account -from redis import WatchError -from redis.lock import Lock as RedisLock class NewStyleLock(RedisLock): def renew(self, new_timeout): """ diff --git a/tasktiger/redis_scripts.py b/tasktiger/redis_scripts.py index 9f7f763b..ffeb11d6 100644 --- a/tasktiger/redis_scripts.py +++ b/tasktiger/redis_scripts.py @@ -7,10 +7,10 @@ redis.call('zadd', {key}, {score}, {member}) end """ -ZADD_NOUPDATE = ZADD_NOUPDATE_TEMPLATE.format( +ZADD_NOUPDATE = ZADD_NOUPDATE_TEMPLATE.format( key='KEYS[1]', score='ARGV[1]', member='ARGV[2]', condition='not' ) -ZADD_UPDATE_EXISTING = ZADD_NOUPDATE_TEMPLATE.format( +ZADD_UPDATE_EXISTING = ZADD_NOUPDATE_TEMPLATE.format( key='KEYS[1]', score='ARGV[1]', member='ARGV[2]', condition='' ) ZADD_UPDATE_TEMPLATE = """ @@ -372,7 +372,7 @@ def zpoppush(self, source, destination, count, score, new_score, (their score will not be updated). """ if score is None: - score = '+inf' # Include all elements. + score = '+inf' # Include all elements. if withscores: if on_success: raise NotImplementedError() @@ -438,7 +438,7 @@ def delete_if_not_in_zsets(self, key, member, set_list, client=None): ``set_list``. Returns the number of removed elements (0 or 1). """ return self._delete_if_not_in_zsets( - keys=[key]+set_list, + keys=[key] + set_list, args=[member], client=client) @@ -491,7 +491,7 @@ def execute_pipeline(self, pipeline, client=None): stack = pipeline.command_stack script_args = [int(self.can_replicate_commands), len(stack)] for args, options in stack: - script_args += [len(args)-1] + list(args) + script_args += [len(args) - 1] + list(args) # Run the pipeline if self.can_replicate_commands: # Redis 3.2 or higher diff --git a/tasktiger/retry.py b/tasktiger/retry.py index 30fc494c..7af80382 100644 --- a/tasktiger/retry.py +++ b/tasktiger/retry.py @@ -1,26 +1,32 @@ # The retry logic is documented in the README. from .exceptions import StopRetry + def _fixed(retry, delay, max_retries): if retry > max_retries: raise StopRetry() return delay + def fixed(delay, max_retries): return (_fixed, (delay, max_retries)) + def _linear(retry, delay, increment, max_retries): if retry > max_retries: raise StopRetry() - return delay + increment*(retry-1) + return delay + increment * (retry - 1) + def linear(delay, increment, max_retries): return (_linear, (delay, increment, max_retries)) + def _exponential(retry, delay, factor, max_retries): if retry > max_retries: raise StopRetry() - return delay * factor**(retry-1) + return delay * factor ** (retry - 1) + def exponential(delay, factor, max_retries): return (_exponential, (delay, factor, max_retries)) diff --git a/tasktiger/rollbar.py b/tasktiger/rollbar.py index ff893c08..dde7d0ac 100644 --- a/tasktiger/rollbar.py +++ b/tasktiger/rollbar.py @@ -5,6 +5,7 @@ import rollbar from rollbar.logger import RollbarHandler + class StructlogRollbarHandler(RollbarHandler): def __init__(self, prefix, *args, **kwargs): """ @@ -17,6 +18,7 @@ def __init__(self, prefix, *args, **kwargs): def format_title(self, data): # Keys used to construct the title and for grouping purposes. KEYS = ['event', 'func', 'exception_name', 'queue'] + def format_field(field, value): if field == 'queue': return '%s=%s' % (field, value.split('.')[0]) diff --git a/tasktiger/schedule.py b/tasktiger/schedule.py index 4ef44fdf..bcfa0efe 100644 --- a/tasktiger/schedule.py +++ b/tasktiger/schedule.py @@ -2,6 +2,7 @@ __all__ = ['periodic'] + def _periodic(dt, period, start_date, end_date): if end_date and dt >= end_date: return None @@ -11,7 +12,7 @@ def _periodic(dt, period, start_date, end_date): # Determine the next time the task should be run delta = dt - start_date - seconds = delta.seconds + delta.days*86400 + seconds = delta.seconds + delta.days * 86400 runs = seconds // period next_run = runs + 1 next_date = start_date + datetime.timedelta(seconds=next_run * period) @@ -22,6 +23,7 @@ def _periodic(dt, period, start_date, end_date): return next_date + def periodic(seconds=0, minutes=0, hours=0, days=0, weeks=0, start_date=None, end_date=None): """ diff --git a/tasktiger/stats.py b/tasktiger/stats.py index 07e10098..3d1455ae 100644 --- a/tasktiger/stats.py +++ b/tasktiger/stats.py @@ -3,6 +3,7 @@ from ._internal import g_fork_lock + class StatsThread(threading.Thread): def __init__(self, tiger): super(StatsThread, self).__init__() diff --git a/tasktiger/task.py b/tasktiger/task.py index 166eaa09..940ae22d 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -3,11 +3,24 @@ import redis import time -from ._internal import * +from ._internal import ( + g, + gen_id, + gen_unique_id, + get_timestamp, + import_attribute, + serialize_func_name, + serialize_retry_method, + ACTIVE, + ERROR, + QUEUED, + SCHEDULED, +) from .exceptions import TaskNotFound __all__ = ['Task'] + class Task(object): def __init__(self, tiger, func=None, args=None, kwargs=None, queue=None, hard_timeout=None, unique=None, lock=None, lock_key=None, @@ -205,24 +218,24 @@ def _move(self, from_state=None, to_state=None, when=None, mode=None): when = time.time() if mode: scripts.zadd(_key(to_state, queue), when, self.id, - mode, client=pipeline) + mode, client=pipeline) else: pipeline.zadd(_key(to_state, queue), self.id, when) pipeline.sadd(_key(to_state), queue) pipeline.zrem(_key(from_state, queue), self.id) - if not to_state: # Remove the task if necessary + if not to_state: # Remove the task if necessary if self.unique: # Only delete if it's not in any other queue check_states = set([ACTIVE, QUEUED, ERROR, SCHEDULED]) check_states.remove(from_state) # TODO: Do the following two in one call. scripts.delete_if_not_in_zsets(_key('task', self.id, 'executions'), - self.id, [ + self.id, [ _key(state, queue) for state in check_states ], client=pipeline) scripts.delete_if_not_in_zsets(_key('task', self.id), - self.id, [ + self.id, [ _key(state, queue) for state in check_states ], client=pipeline) else: @@ -350,6 +363,48 @@ def from_id(self, tiger, queue, state, task_id, load_executions=0): task_id )) + @classmethod + def queue_metrics(self, tiger): + """ + Returns a dict of queue metrics. + + For ex: + { + 'active': { + 'default': { + 'total': 3, + }, + }, + 'error': {}, + 'queued': { + 'default': { + 'total': 10, + }, + 'other': { + 'total': 42, + }, + }, + 'scheduled': {}, + } + """ + + metrics = { + 'active': {}, + 'error': {}, + 'queued': {}, + 'scheduled': {}, + } + prefix = tiger.config['REDIS_PREFIX'] + ':' + + for state in metrics.keys(): + queues = tiger.connection.smembers(prefix + state) + for queue in queues: + metrics[state][queue] = { + 'total': self.task_count_from_queue(tiger, queue, state), + } + + return metrics + @classmethod def tasks_from_queue(self, tiger, queue, state, skip=0, limit=1000, load_executions=0): @@ -365,7 +420,7 @@ def tasks_from_queue(self, tiger, queue, state, skip=0, limit=1000, key = tiger._key(state, queue) pipeline = tiger.connection.pipeline() pipeline.zcard(key) - pipeline.zrange(key, -limit-skip, -1-skip, withscores=True) + pipeline.zrange(key, -limit - skip, -1 - skip, withscores=True) n, items = pipeline.execute() tasks = [] @@ -397,6 +452,16 @@ def tasks_from_queue(self, tiger, queue, state, skip=0, limit=1000, return n, tasks + @classmethod + def task_count_from_queue(self, tiger, queue, state): + """ + Returns the number of tasks in a given queue and task state. + """ + + key = tiger._key(state, queue) + count = tiger.connection.zcount(key, '-inf', '+inf') + return count + def n_executions(self): """ Queries and returns the number of past task executions. diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 66254cca..d5ae99b1 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -14,20 +14,36 @@ from .redis_lock import Lock -from ._internal import * +from ._internal import ( + dotted_parts, + g, + g_fork_lock, + gen_unique_id, + import_attribute, + reversed_dotted_parts, + serialize_retry_method, + serialize_func_name, + ACTIVE, + ERROR, + QUEUED, + SCHEDULED, + TaskImportError, +) from .exceptions import RetryException, TaskNotFound -from .retry import * +from .retry import StopRetry from .stats import StatsThread from .task import Task from .timeouts import UnixSignalDeathPenalty, JobTimeoutException __all__ = ['Worker'] + def sigchld_handler(*args): # Nothing to do here. This is just a dummy handler that we set up to catch # the child process exiting. pass + class Worker(object): def __init__(self, tiger, queues=None, exclude_queues=None, single_worker_queues=None): @@ -177,14 +193,16 @@ def _wait_for_new_tasks(self, timeout=0, batch_timeout=0): message = self._pubsub.get_message() if self._did_work: - break # Exit immediately if we did work during the last - # execution loop because there might be more work to do + # Exit immediately if we did work during the last + # execution loop because there might be more work to do + break elif time.time() >= batch_exit and new_queue_found: - break # After finding a new queue we can wait until the - # batch timeout expires + # After finding a new queue we can wait until the + # batch timeout expires + break elif time.time() - start_time > timeout: - break # Always exit after our maximum wait time - + # Always exit after our maximum wait time + break def _worker_queue_expired_tasks(self): """ @@ -287,8 +305,7 @@ def _execute_forked(self, tasks, log): 'kwargs': task.kwargs, } for task in tasks] task_timeouts = [task.hard_timeout for task in tasks if task.hard_timeout is not None] - hard_timeout = ((max(task_timeouts) if task_timeouts else None) - or + hard_timeout = ((max(task_timeouts) if task_timeouts else None) or getattr(func, '_task_hard_timeout', None) or self.config['DEFAULT_HARD_TIMEOUT']) @@ -324,7 +341,7 @@ def _execute_forked(self, tasks, log): execution['time_failed'] = time.time() # Currently we only log failed task executions to Redis. execution['traceback'] = \ - ''.join(traceback.format_exception(*exc_info)) + ''.join(traceback.format_exception(*exc_info)) execution['success'] = success execution['host'] = socket.gethostname() serialized_execution = json.dumps(execution) @@ -460,7 +477,7 @@ def check_child_exit(): """ try: pid, return_code = os.waitpid(child_pid, os.WNOHANG) - if pid != 0: # The child process is done. + if pid != 0: # The child process is done. return return_code except OSError as e: # Of course EINTR can happen if the child process exits @@ -746,8 +763,8 @@ def _mark_done(): retry_func, retry_args = execution['retry_method'] else: # We expect the serialized method here. - retry_func, retry_args = serialize_retry_method( \ - self.config['DEFAULT_RETRY_METHOD']) + retry_func, retry_args = serialize_retry_method( + self.config['DEFAULT_RETRY_METHOD']) should_log_error = execution['log_error'] should_retry = True @@ -905,8 +922,8 @@ def run(self, once=False, force_once=False): """ self.log.info('ready', queues=sorted(self.only_queues), - exclude_queues=sorted(self.exclude_queues), - single_worker_queues=sorted(self.single_worker_queues)) + exclude_queues=sorted(self.exclude_queues), + single_worker_queues=sorted(self.single_worker_queues)) if not self.scripts.can_replicate_commands: # Older Redis versions may create additional overhead when @@ -948,7 +965,7 @@ def run(self, once=False, force_once=False): except KeyboardInterrupt: pass - except Exception as e: + except Exception: self.log.exception(event='exception') raise diff --git a/tests/test_base.py b/tests/test_base.py index fba24f27..47f5859f 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -763,8 +763,8 @@ def test_tasks_from_queue(self): assert task0.queue == 'default' def test_tasks_from_queue_with_executions(self): - task = self.tiger.delay(exception_task, retry=True) - + self.tiger.delay(exception_task, retry=True) + # Get two executions in task Worker(self.tiger).run(once=True) time.sleep(DELAY) @@ -782,6 +782,36 @@ def test_tasks_from_queue_with_executions(self): assert n == 1 assert len(tasks[0].executions) == 2 + def test_task_count_from_queue(self): + task0 = Task(self.tiger, simple_task) + task1 = Task(self.tiger, exception_task) + task2 = Task(self.tiger, simple_task, queue='other') + + n = Task.task_count_from_queue(self.tiger, 'default', 'queued') + assert n == 0 + + task0.delay() + task1.delay() + task2.delay() + + n = Task.task_count_from_queue(self.tiger, 'default', 'queued') + assert n == 2 + n = Task.task_count_from_queue(self.tiger, 'other', 'queued') + assert n == 1 + + def test_queue_metrics(self): + task0 = Task(self.tiger, simple_task) + task1 = Task(self.tiger, exception_task) + task2 = Task(self.tiger, simple_task, queue='other') + + task0.delay() + task1.delay() + task2.delay() + + metrics = Task.queue_metrics(self.tiger) + assert metrics['queued']['default']['total'] == 2 + assert metrics['queued']['other']['total'] == 1 + def test_eager(self): self.tiger.config['ALWAYS_EAGER'] = True