From 8e1e88a3915656c9f1e2c2a1fe1a8d5577add91f Mon Sep 17 00:00:00 2001 From: Ethan Date: Thu, 1 Oct 2020 20:44:56 -0400 Subject: [PATCH 1/7] Upgrade websocket notification to RuntimeError --- celery_progress/websockets/backend.py | 35 +++++++++++---------------- celery_progress/websockets/tasks.py | 4 +-- 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/celery_progress/websockets/backend.py b/celery_progress/websockets/backend.py index 6e42697..92dd332 100644 --- a/celery_progress/websockets/backend.py +++ b/celery_progress/websockets/backend.py @@ -1,39 +1,32 @@ -import logging - from celery_progress.backend import ProgressRecorder, Progress try: from asgiref.sync import async_to_sync from channels.layers import get_channel_layer except ImportError: - async_to_sync = get_channel_layer = None - WEBSOCKETS_AVAILABLE = False + channel_layer = None else: - WEBSOCKETS_AVAILABLE = get_channel_layer() - + channel_layer = get_channel_layer() -logger = logging.getLogger(__name__) +if not channel_layer: + RuntimeError( + 'Tried to use websocket progress bar, but dependencies were not installed / configured. ' + 'Use pip install celery-progress[websockets] and set up channels to enable this feature. ' + 'See: https://channels.readthedocs.io/en/latest/ for more details.' + ) class WebSocketProgressRecorder(ProgressRecorder): @staticmethod def push_update(task_id): - if WEBSOCKETS_AVAILABLE: - try: - channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( - task_id, - {'type': 'update_task_progress', 'data': {**Progress(task_id).get_info()}} - ) - except AttributeError: # No channel layer to send to, so ignore it - pass - else: - logger.info( - 'Tried to use websocket progress bar, but dependencies were not installed / configured. ' - 'Use pip install celery-progress[websockets] and setup channels to enable this feature.' - 'See: https://channels.readthedocs.io/en/latest/ for more details.' + try: + async_to_sync(channel_layer.group_send)( + task_id, + {'type': 'update_task_progress', 'data': {**Progress(task_id).get_info()}} ) + except AttributeError: # No channel layer to send to, so ignore it + pass def set_progress(self, current, total, description=""): super().set_progress(current, total, description) diff --git a/celery_progress/websockets/tasks.py b/celery_progress/websockets/tasks.py index 76ca206..15aa819 100644 --- a/celery_progress/websockets/tasks.py +++ b/celery_progress/websockets/tasks.py @@ -1,6 +1,6 @@ from celery.signals import task_postrun -from .backend import WEBSOCKETS_AVAILABLE, WebSocketProgressRecorder +from .backend import channel_layer, WebSocketProgressRecorder @task_postrun.connect @@ -8,5 +8,5 @@ def task_postrun_handler(task_id, **kwargs): """Runs after a task has finished. This will be used to push a websocket update for completed events. If the websockets version of this package is not installed, this will do nothing.""" - if WEBSOCKETS_AVAILABLE: + if channel_layer: WebSocketProgressRecorder.push_update(task_id) From 36b147b86d5100f4353c79c4704c8999798df2ab Mon Sep 17 00:00:00 2001 From: Ethan Date: Thu, 1 Oct 2020 21:27:04 -0400 Subject: [PATCH 2/7] Return task metadata with default functions for easier extendability --- celery_progress/backend.py | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/celery_progress/backend.py b/celery_progress/backend.py index cf923af..d749431 100644 --- a/celery_progress/backend.py +++ b/celery_progress/backend.py @@ -24,7 +24,6 @@ class ConsoleProgressRecorder(AbstractProgressRecorder): def set_progress(self, current, total, description=""): print('processed {} items of {}. {}'.format(current, total, description)) - def stop_task(self, current, total, exc): pass @@ -39,29 +38,33 @@ def set_progress(self, current, total, description=""): if total > 0: percent = (Decimal(current) / Decimal(total)) * Decimal(100) percent = float(round(percent, 2)) + meta = { + 'pending': False, + 'current': current, + 'total': total, + 'percent': percent, + 'description': description + } self.task.update_state( state=PROGRESS_STATE, - meta={ - 'pending': False, - 'current': current, - 'total': total, - 'percent': percent, - 'description': description - } + meta=meta ) + return meta def stop_task(self, current, total, exc): + meta = { + 'pending': False, + 'current': current, + 'total': total, + 'percent': 100.0, + 'exc_message': str(exc), + 'exc_type': str(type(exc)) + } self.task.update_state( state='FAILURE', - meta={ - 'pending': False, - 'current': current, - 'total': total, - 'percent': 100.0, - 'exc_message': str(exc), - 'exc_type': str(type(exc)) - } + meta=meta ) + return meta class Progress(object): From cbe3e3ecac90b3052ffb14615bef4898cbcd1a50 Mon Sep 17 00:00:00 2001 From: Ethan Date: Thu, 1 Oct 2020 21:33:56 -0400 Subject: [PATCH 3/7] Remove unnecessary `Progress(task_id).get_info()` usages, we're switching to a homegrown and more performant solution here --- celery_progress/websockets/backend.py | 18 +++++++++++------- celery_progress/websockets/tasks.py | 15 ++++++++++----- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/celery_progress/websockets/backend.py b/celery_progress/websockets/backend.py index 92dd332..5233525 100644 --- a/celery_progress/websockets/backend.py +++ b/celery_progress/websockets/backend.py @@ -1,4 +1,4 @@ -from celery_progress.backend import ProgressRecorder, Progress +from celery_progress.backend import ProgressRecorder try: from asgiref.sync import async_to_sync @@ -19,19 +19,23 @@ class WebSocketProgressRecorder(ProgressRecorder): @staticmethod - def push_update(task_id): + def push_update(task_id, data): try: async_to_sync(channel_layer.group_send)( task_id, - {'type': 'update_task_progress', 'data': {**Progress(task_id).get_info()}} + {'type': 'update_task_progress', 'data': data} ) except AttributeError: # No channel layer to send to, so ignore it pass def set_progress(self, current, total, description=""): - super().set_progress(current, total, description) - self.push_update(self.task.request.id) + progress = super().set_progress(current, total, description) + data = {'complete': False, 'success': None, 'progress': progress} + self.push_update(self.task.request.id, data) def stop_task(self, current, total, exc): - super().stop_task(current, total, exc) - self.push_update(self.task.request.id) + progress = super().stop_task(current, total, exc) + progress.pop('exc_type') + result = progress.pop('exc_message') + data = {'complete': True, 'success': False, 'progress': progress, 'result': result} + self.push_update(self.task.request.id, data) diff --git a/celery_progress/websockets/tasks.py b/celery_progress/websockets/tasks.py index 15aa819..16cd836 100644 --- a/celery_progress/websockets/tasks.py +++ b/celery_progress/websockets/tasks.py @@ -1,12 +1,17 @@ from celery.signals import task_postrun -from .backend import channel_layer, WebSocketProgressRecorder +from .backend import WebSocketProgressRecorder -@task_postrun.connect +@task_postrun.connect(retry=True) def task_postrun_handler(task_id, **kwargs): """Runs after a task has finished. This will be used to push a websocket update for completed events. - If the websockets version of this package is not installed, this will do nothing.""" - if channel_layer: - WebSocketProgressRecorder.push_update(task_id) + If the websockets version of this package is not installed, this will fail silently.""" + data = { + 'complete': True, + 'success': kwargs.pop('state') == 'SUCCESS', + 'progress': {'pending': False, 'current': 100, 'total': 100, 'percent': 100}, + 'result': kwargs.pop('retval') + } + WebSocketProgressRecorder.push_update(task_id, data=data) From 37f15ced2709f1854fd6c9676959fa5b7a82c89d Mon Sep 17 00:00:00 2001 From: Ethan Date: Thu, 1 Oct 2020 23:02:56 -0400 Subject: [PATCH 4/7] Cast final result to string, because apparently Progress does that somewhere internally to not break the entire system --- celery_progress/websockets/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/celery_progress/websockets/tasks.py b/celery_progress/websockets/tasks.py index 16cd836..35bdb4b 100644 --- a/celery_progress/websockets/tasks.py +++ b/celery_progress/websockets/tasks.py @@ -12,6 +12,6 @@ def task_postrun_handler(task_id, **kwargs): 'complete': True, 'success': kwargs.pop('state') == 'SUCCESS', 'progress': {'pending': False, 'current': 100, 'total': 100, 'percent': 100}, - 'result': kwargs.pop('retval') + 'result': str(kwargs.pop('retval')) } WebSocketProgressRecorder.push_update(task_id, data=data) From 99c2dcc5ca82f5a68efaa6bd994c4c4bea7aa09d Mon Sep 17 00:00:00 2001 From: Ethan Date: Fri, 2 Oct 2020 11:51:02 -0400 Subject: [PATCH 5/7] Finish implementing retry=True benefits for post-run handler --- celery_progress/websockets/backend.py | 5 ++++- celery_progress/websockets/tasks.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/celery_progress/websockets/backend.py b/celery_progress/websockets/backend.py index 5233525..b46242d 100644 --- a/celery_progress/websockets/backend.py +++ b/celery_progress/websockets/backend.py @@ -19,7 +19,7 @@ class WebSocketProgressRecorder(ProgressRecorder): @staticmethod - def push_update(task_id, data): + def push_update(task_id, data, final=False): try: async_to_sync(channel_layer.group_send)( task_id, @@ -27,6 +27,9 @@ def push_update(task_id, data): ) except AttributeError: # No channel layer to send to, so ignore it pass + except RuntimeError as e: # We're sending messages too fast for asgiref to handle, drop it + if final and channel_layer: # Send error back to post-run handler for a retry + raise e def set_progress(self, current, total, description=""): progress = super().set_progress(current, total, description) diff --git a/celery_progress/websockets/tasks.py b/celery_progress/websockets/tasks.py index 35bdb4b..5407886 100644 --- a/celery_progress/websockets/tasks.py +++ b/celery_progress/websockets/tasks.py @@ -14,4 +14,4 @@ def task_postrun_handler(task_id, **kwargs): 'progress': {'pending': False, 'current': 100, 'total': 100, 'percent': 100}, 'result': str(kwargs.pop('retval')) } - WebSocketProgressRecorder.push_update(task_id, data=data) + WebSocketProgressRecorder.push_update(task_id, data=data, final=True) From 58b8ec542cc6346036ea08ce79f486e9740e91b0 Mon Sep 17 00:00:00 2001 From: Ethan Date: Fri, 2 Oct 2020 21:52:24 -0400 Subject: [PATCH 6/7] Add back websocket progress bar notification as logging warning --- celery_progress/websockets/backend.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/celery_progress/websockets/backend.py b/celery_progress/websockets/backend.py index b46242d..1bd6ead 100644 --- a/celery_progress/websockets/backend.py +++ b/celery_progress/websockets/backend.py @@ -1,3 +1,5 @@ +import logging + from celery_progress.backend import ProgressRecorder try: @@ -8,16 +10,21 @@ else: channel_layer = get_channel_layer() -if not channel_layer: - RuntimeError( - 'Tried to use websocket progress bar, but dependencies were not installed / configured. ' - 'Use pip install celery-progress[websockets] and set up channels to enable this feature. ' - 'See: https://channels.readthedocs.io/en/latest/ for more details.' - ) +logger = logging.getLogger(__name__) class WebSocketProgressRecorder(ProgressRecorder): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + if not channel_layer: + logger.warning( + 'Tried to use websocket progress bar, but dependencies were not installed / configured. ' + 'Use pip install celery-progress[websockets] and set up channels to enable this feature. ' + 'See: https://channels.readthedocs.io/en/latest/ for more details.' + ) + @staticmethod def push_update(task_id, data, final=False): try: From 0d9efa6dc615c797d0707e1fa6c6fb440b9d1619 Mon Sep 17 00:00:00 2001 From: Ethan Date: Mon, 5 Oct 2020 01:53:07 -0400 Subject: [PATCH 7/7] Remove redundant dictionary unpacking and repacking --- celery_progress/websockets/consumers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/celery_progress/websockets/consumers.py b/celery_progress/websockets/consumers.py index 5821789..5d87259 100644 --- a/celery_progress/websockets/consumers.py +++ b/celery_progress/websockets/consumers.py @@ -30,7 +30,7 @@ async def receive(self, text_data): self.task_id, { 'type': 'update_task_progress', - 'data': {**Progress(self.task_id).get_info()} + 'data': Progress(self.task_id).get_info() } )