Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scheduler pre_send exception handling #254

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/examples/extending/schedule_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ async def pre_send(self, task: "ScheduledTask") -> None:
"""
Actions to execute before task will be sent to broker.

This method may raise ScheduledTaskCancelledError.
This cancels the task execution.

:param task: task that will be sent
"""

Expand Down
3 changes: 3 additions & 0 deletions taskiq/abc/schedule_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def pre_send( # noqa: B027
"""
Actions to execute before task will be sent to broker.

This method may raise ScheduledTaskCancelledError.
This cancels the task execution.

:param task: task that will be sent
"""

Expand Down
4 changes: 4 additions & 0 deletions taskiq/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ class NoResultError(TaskiqError):

class TaskRejectedError(TaskiqError):
"""Task was rejected."""


class ScheduledTaskCancelledError(TaskiqError):
"""Scheduled task was cancelled and not sent to the queue."""
20 changes: 14 additions & 6 deletions taskiq/scheduler/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from logging import getLogger
from typing import TYPE_CHECKING, List

from taskiq.exceptions import ScheduledTaskCancelledError
from taskiq.kicker import AsyncKicker
from taskiq.scheduler.scheduled_task import ScheduledTask
from taskiq.utils import maybe_awaitable
Expand All @@ -8,6 +10,8 @@
from taskiq.abc.broker import AsyncBroker
from taskiq.abc.schedule_source import ScheduleSource

logger = getLogger(__name__)


class TaskiqScheduler:
"""Scheduler class."""
Expand Down Expand Up @@ -36,12 +40,16 @@ async def on_ready(self, source: "ScheduleSource", task: ScheduledTask) -> None:
It's triggered on proper time depending on `task.cron` or `task.time` attribute.
:param task: task to send
"""
await maybe_awaitable(source.pre_send(task))
await AsyncKicker(task.task_name, self.broker, task.labels).kiq(
*task.args,
**task.kwargs,
)
await maybe_awaitable(source.post_send(task))
try:
await maybe_awaitable(source.pre_send(task))
except ScheduledTaskCancelledError:
logger.info("Scheduled task %s has been cancelled.", task.task_name)
else:
await AsyncKicker(task.task_name, self.broker, task.labels).kiq(
*task.args,
**task.kwargs,
)
await maybe_awaitable(source.post_send(task))

async def shutdown(self) -> None:
"""Shutdown the scheduler process."""
Expand Down
38 changes: 38 additions & 0 deletions tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Any, Coroutine, List, Union

import pytest

from taskiq.abc.schedule_source import ScheduleSource
from taskiq.brokers.inmemory_broker import InMemoryBroker
from taskiq.exceptions import ScheduledTaskCancelledError
from taskiq.scheduler.scheduled_task import ScheduledTask
from taskiq.scheduler.scheduler import TaskiqScheduler


class CancellingScheduleSource(ScheduleSource):
async def get_schedules(self) -> List["ScheduledTask"]:
"""Return schedules list."""
return []

def pre_send(
self,
task: "ScheduledTask",
) -> Union[None, Coroutine[Any, Any, None]]:
"""Raise cancelled error."""
raise ScheduledTaskCancelledError


@pytest.mark.anyio
async def test_scheduled_task_cancelled() -> None:
broker = InMemoryBroker()
source = CancellingScheduleSource()
scheduler = TaskiqScheduler(broker=broker, sources=[source])
task = ScheduledTask(
task_name="ping:pong",
labels={},
args=[],
kwargs={},
cron="* * * * *",
)

await scheduler.on_ready(source, task) # error is caught
Loading