Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket Optimization #51

Merged
merged 7 commits into from
Oct 5, 2020
35 changes: 14 additions & 21 deletions celery_progress/websockets/backend.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,32 @@
import logging

from celery_progress.backend import ProgressRecorder, Progress

try:
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
except ImportError:
async_to_sync = get_channel_layer = None
WEBSOCKETS_AVAILABLE = False
channel_layer = None
else:
WEBSOCKETS_AVAILABLE = get_channel_layer()

channel_layer = get_channel_layer()

logger = logging.getLogger(__name__)
if not channel_layer:
RuntimeError(
'Tried to use websocket progress bar, but dependencies were not installed / configured. '
'Use pip install celery-progress[websockets] and set up channels to enable this feature. '
'See: https://channels.readthedocs.io/en/latest/ for more details.'
)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that anything that ever imports from this file will raise an exception here if things aren't configured, yeah?

I'm not sure that's a problem, but it feels a bit less risky if the error gets raised when you actually try to use the class, not just import it. Is there a reason you didn't do it that way?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will error if the default channel layer has not been configured. I originally thought it would be best to catch the error before it happens, and only evaluate whether the channel layer exists once vs doing it every time a task is updated, but I understand if the other change would make more sense. Although, I now notice that that error will do nothing anyways, as it's not raised at all. How I managed to remove raise during testing and not put it back before committing I will never know. I will push a new commit moving it back into WebSocketProgressRecorder.

Copy link
Collaborator Author

@EJH2 EJH2 Oct 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you rather it stay as a logging error, or as an actual error?

As a logger error in __init__, the message would fire once for task, and the except AttributeError would silently drop all messages. As a real error in __init__, attempting to use the progress bar would just kill the task and put a traceback in the celery console.

Another interesting interaction that I've noticed is that currently, if a user has channels properly set up with the exception of having a channel layer named "default" configured, attempting to connect to the websocket via JS will throw AttributeError: 'NoneType' object has no attribute 'group_add'. This is due to the fact that, by default, the consumer will look for the "default" channel layer, and fail if it can't be found. Should we handle this as well?



class WebSocketProgressRecorder(ProgressRecorder):

@staticmethod
def push_update(task_id):
if WEBSOCKETS_AVAILABLE:
try:
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
task_id,
{'type': 'update_task_progress', 'data': {**Progress(task_id).get_info()}}
)
except AttributeError: # No channel layer to send to, so ignore it
pass
else:
logger.info(
'Tried to use websocket progress bar, but dependencies were not installed / configured. '
'Use pip install celery-progress[websockets] and setup channels to enable this feature.'
'See: https://channels.readthedocs.io/en/latest/ for more details.'
try:
async_to_sync(channel_layer.group_send)(
task_id,
{'type': 'update_task_progress', 'data': {**Progress(task_id).get_info()}}
)
except AttributeError: # No channel layer to send to, so ignore it
pass
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this path reachable anymore?

Copy link
Collaborator Author

@EJH2 EJH2 Oct 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the default channel layer cannot be found, and due to the lack of properly raising the error as explained above, the AttributeError exception will still be triggered by the post-run handler.

One thing I've been thinking of is that currently, the nature of get_channel_layer() just looks for a channel layer named "default", and if none can be found, returns None instead. Should we offer the ability to configure the name of this layer, so that other layers are not potentially impacted? I would investigate that as another PR if so, as to not clog this one with features.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've never actually used the websocket implementation - so it's not a hugely informed opinion - but at a high level configuring it by name sounds useful.


def set_progress(self, current, total, description=""):
super().set_progress(current, total, description)
Expand Down
4 changes: 2 additions & 2 deletions celery_progress/websockets/tasks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from celery.signals import task_postrun

from .backend import WEBSOCKETS_AVAILABLE, WebSocketProgressRecorder
from .backend import channel_layer, WebSocketProgressRecorder


@task_postrun.connect
def task_postrun_handler(task_id, **kwargs):
"""Runs after a task has finished. This will be used to push a websocket update for completed events.

If the websockets version of this package is not installed, this will do nothing."""
if WEBSOCKETS_AVAILABLE:
if channel_layer:
WebSocketProgressRecorder.push_update(task_id)