From 8497307a8fc82ba82f118b4154972bd7d1610128 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Mus=C3=ADlek?= Date: Tue, 5 Dec 2023 10:16:09 +0100 Subject: [PATCH] Add scheduler pre_send exception handling --- docs/examples/extending/schedule_source.py | 3 ++ taskiq/abc/schedule_source.py | 3 ++ taskiq/exceptions.py | 4 +++ taskiq/scheduler/scheduler.py | 20 ++++++++---- tests/scheduler/test_scheduler.py | 38 ++++++++++++++++++++++ 5 files changed, 62 insertions(+), 6 deletions(-) create mode 100644 tests/scheduler/test_scheduler.py diff --git a/docs/examples/extending/schedule_source.py b/docs/examples/extending/schedule_source.py index fbc7f905..0da538f1 100644 --- a/docs/examples/extending/schedule_source.py +++ b/docs/examples/extending/schedule_source.py @@ -38,6 +38,9 @@ async def pre_send(self, task: "ScheduledTask") -> None: """ Actions to execute before task will be sent to broker. + This method may raise ScheduledTaskCancelledError. + This cancels the task execution. + :param task: task that will be sent """ diff --git a/taskiq/abc/schedule_source.py b/taskiq/abc/schedule_source.py index 1341d460..10a3b181 100644 --- a/taskiq/abc/schedule_source.py +++ b/taskiq/abc/schedule_source.py @@ -60,6 +60,9 @@ def pre_send( # noqa: B027 """ Actions to execute before task will be sent to broker. + This method may raise ScheduledTaskCancelledError. + This cancels the task execution. + :param task: task that will be sent """ diff --git a/taskiq/exceptions.py b/taskiq/exceptions.py index a4eec6ab..be5e4cc9 100644 --- a/taskiq/exceptions.py +++ b/taskiq/exceptions.py @@ -40,3 +40,7 @@ class NoResultError(TaskiqError): class TaskRejectedError(TaskiqError): """Task was rejected.""" + + +class ScheduledTaskCancelledError(TaskiqError): + """Scheduled task was cancelled and not sent to the queue.""" diff --git a/taskiq/scheduler/scheduler.py b/taskiq/scheduler/scheduler.py index 7de51c29..d8993790 100644 --- a/taskiq/scheduler/scheduler.py +++ b/taskiq/scheduler/scheduler.py @@ -1,5 +1,7 @@ +from logging import getLogger from typing import TYPE_CHECKING, List +from taskiq.exceptions import ScheduledTaskCancelledError from taskiq.kicker import AsyncKicker from taskiq.scheduler.scheduled_task import ScheduledTask from taskiq.utils import maybe_awaitable @@ -8,6 +10,8 @@ from taskiq.abc.broker import AsyncBroker from taskiq.abc.schedule_source import ScheduleSource +logger = getLogger(__name__) + class TaskiqScheduler: """Scheduler class.""" @@ -36,12 +40,16 @@ async def on_ready(self, source: "ScheduleSource", task: ScheduledTask) -> None: It's triggered on proper time depending on `task.cron` or `task.time` attribute. :param task: task to send """ - await maybe_awaitable(source.pre_send(task)) - await AsyncKicker(task.task_name, self.broker, task.labels).kiq( - *task.args, - **task.kwargs, - ) - await maybe_awaitable(source.post_send(task)) + try: + await maybe_awaitable(source.pre_send(task)) + except ScheduledTaskCancelledError: + logger.info("Scheduled task %s has been cancelled.", task.task_name) + else: + await AsyncKicker(task.task_name, self.broker, task.labels).kiq( + *task.args, + **task.kwargs, + ) + await maybe_awaitable(source.post_send(task)) async def shutdown(self) -> None: """Shutdown the scheduler process.""" diff --git a/tests/scheduler/test_scheduler.py b/tests/scheduler/test_scheduler.py new file mode 100644 index 00000000..1b4b6375 --- /dev/null +++ b/tests/scheduler/test_scheduler.py @@ -0,0 +1,38 @@ +from typing import Any, Coroutine, List, Union + +import pytest + +from taskiq.abc.schedule_source import ScheduleSource +from taskiq.brokers.inmemory_broker import InMemoryBroker +from taskiq.exceptions import ScheduledTaskCancelledError +from taskiq.scheduler.scheduled_task import ScheduledTask +from taskiq.scheduler.scheduler import TaskiqScheduler + + +class CancellingScheduleSource(ScheduleSource): + async def get_schedules(self) -> List["ScheduledTask"]: + """Return schedules list.""" + return [] + + def pre_send( + self, + task: "ScheduledTask", + ) -> Union[None, Coroutine[Any, Any, None]]: + """Raise cancelled error.""" + raise ScheduledTaskCancelledError + + +@pytest.mark.anyio +async def test_scheduled_task_cancelled() -> None: + broker = InMemoryBroker() + source = CancellingScheduleSource() + scheduler = TaskiqScheduler(broker=broker, sources=[source]) + task = ScheduledTask( + task_name="ping:pong", + labels={}, + args=[], + kwargs={}, + cron="* * * * *", + ) + + await scheduler.on_ready(source, task) # error is caught