From 53a01076cd851178e7b488f0d6ad940870f0e027 Mon Sep 17 00:00:00 2001 From: Adam Greene Date: Sat, 30 Oct 2021 15:24:42 -0400 Subject: [PATCH 1/2] Add on_success and on_failure callback kwargs --- src/flask_rq2/app.py | 13 ++++++++++++- src/flask_rq2/functions.py | 16 +++++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/src/flask_rq2/app.py b/src/flask_rq2/app.py index 149049b..1a137b8 100644 --- a/src/flask_rq2/app.py +++ b/src/flask_rq2/app.py @@ -224,7 +224,8 @@ def my_custom_handler(job, *exc_info): return callback def job(self, func_or_queue=None, timeout=None, result_ttl=None, ttl=None, - depends_on=None, at_front=None, meta=None, description=None): + depends_on=None, at_front=None, meta=None, description=None, + on_success=None, on_failure=None): """ Decorator to mark functions for queuing via RQ, e.g.:: @@ -274,6 +275,14 @@ def add(x, y): :param description: Description of the job. :type description: str + :param on_success: Callback when job success. + :type on_success: Callable + + :param on_failure: Callback when job fails. + :type on_failure: Callable + + + """ if callable(func_or_queue): func = func_or_queue @@ -295,6 +304,8 @@ def wrapper(wrapped): at_front=at_front, meta=meta, description=description, + on_success=on_success, + on_failure=on_failure, ) wrapped.helper = helper for function in helper.functions: diff --git a/src/flask_rq2/functions.py b/src/flask_rq2/functions.py index 36c072c..b89f210 100644 --- a/src/flask_rq2/functions.py +++ b/src/flask_rq2/functions.py @@ -15,7 +15,7 @@ class JobFunctions(object): functions = ['queue', 'schedule', 'cron'] def __init__(self, rq, wrapped, queue_name, timeout, result_ttl, ttl, - depends_on, at_front, meta, description): + depends_on, at_front, meta, description, on_success, on_failure): self.rq = rq self.wrapped = wrapped self._queue_name = queue_name @@ -28,6 +28,8 @@ def __init__(self, rq, wrapped, queue_name, timeout, result_ttl, ttl, self._at_front = at_front self._meta = meta self._description = description + self._on_success = on_success + self._on_failure = on_failure def __repr__(self): full_name = '.'.join([self.wrapped.__module__, self.wrapped.__name__]) @@ -104,6 +106,14 @@ def add(x, y): :mod:`UUID `. :type job_id: str + :param on_success: A callback when the job suceeds. Defaults + to None + :type job_id: Callable + + :param on_failure: A callback when the job fails. Defaults + to None + :type job_id: Callable + :param at_front: Whether or not the job is queued in front of all other enqueued jobs. :type at_front: bool @@ -124,6 +134,8 @@ def add(x, y): at_front = kwargs.pop('at_front', self._at_front) meta = kwargs.pop('meta', self._meta) description = kwargs.pop('description', self._description) + on_success = kwargs.pop('on_success', self._on_success) + on_failure = kwargs.pop('on_failure', self._on_failure) return self.rq.get_queue(queue_name).enqueue_call( self.wrapped, args=args, @@ -136,6 +148,8 @@ def add(x, y): at_front=at_front, meta=meta, description=description, + on_success=on_success, + on_failure=on_failure, ) def schedule(self, time_or_delta, *args, **kwargs): From 893dbf65fe8e3c4bf5608a79b999894a86666607 Mon Sep 17 00:00:00 2001 From: Amal XPS Date: Sat, 19 Feb 2022 20:52:04 +0400 Subject: [PATCH 2/2] added retry --- src/flask_rq2/app.py | 5 ++++- src/flask_rq2/functions.py | 12 +++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/flask_rq2/app.py b/src/flask_rq2/app.py index 1a137b8..1c2e23c 100644 --- a/src/flask_rq2/app.py +++ b/src/flask_rq2/app.py @@ -225,7 +225,7 @@ def my_custom_handler(job, *exc_info): def job(self, func_or_queue=None, timeout=None, result_ttl=None, ttl=None, depends_on=None, at_front=None, meta=None, description=None, - on_success=None, on_failure=None): + on_success=None, on_failure=None, retry=None): """ Decorator to mark functions for queuing via RQ, e.g.:: @@ -281,6 +281,8 @@ def add(x, y): :param on_failure: Callback when job fails. :type on_failure: Callable + :param retry: A Retry() object that specifies how the job should be retried on failure. + :type retry: rq.job.Retry """ @@ -306,6 +308,7 @@ def wrapper(wrapped): description=description, on_success=on_success, on_failure=on_failure, + retry=retry, ) wrapped.helper = helper for function in helper.functions: diff --git a/src/flask_rq2/functions.py b/src/flask_rq2/functions.py index b89f210..7233c41 100644 --- a/src/flask_rq2/functions.py +++ b/src/flask_rq2/functions.py @@ -15,7 +15,7 @@ class JobFunctions(object): functions = ['queue', 'schedule', 'cron'] def __init__(self, rq, wrapped, queue_name, timeout, result_ttl, ttl, - depends_on, at_front, meta, description, on_success, on_failure): + depends_on, at_front, meta, description, on_success, on_failure, retry): self.rq = rq self.wrapped = wrapped self._queue_name = queue_name @@ -30,6 +30,7 @@ def __init__(self, rq, wrapped, queue_name, timeout, result_ttl, ttl, self._description = description self._on_success = on_success self._on_failure = on_failure + self._retry = retry def __repr__(self): full_name = '.'.join([self.wrapped.__module__, self.wrapped.__name__]) @@ -108,11 +109,14 @@ def add(x, y): :param on_success: A callback when the job suceeds. Defaults to None - :type job_id: Callable + :type on_success: Callable :param on_failure: A callback when the job fails. Defaults to None - :type job_id: Callable + :type on_failure: Callable + + :param retry: A Retry() object that specifies how the job should be retried on failure. + :type retry: rq.job.Retry :param at_front: Whether or not the job is queued in front of all other enqueued jobs. @@ -136,6 +140,7 @@ def add(x, y): description = kwargs.pop('description', self._description) on_success = kwargs.pop('on_success', self._on_success) on_failure = kwargs.pop('on_failure', self._on_failure) + retry = kwargs.pop('retry', self._retry) return self.rq.get_queue(queue_name).enqueue_call( self.wrapped, args=args, @@ -150,6 +155,7 @@ def add(x, y): description=description, on_success=on_success, on_failure=on_failure, + retry=retry ) def schedule(self, time_or_delta, *args, **kwargs):