Skip to content

Commit

Permalink
add support for abort notification
Browse files Browse the repository at this point in the history
  • Loading branch information
onlyann committed Sep 6, 2024
1 parent ed4d625 commit 1511f31
Show file tree
Hide file tree
Showing 33 changed files with 565 additions and 242 deletions.
11 changes: 9 additions & 2 deletions docs/discussions.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,21 @@ many factors to take into account when [sizing your pool](https://wiki.postgresq

### How the `polling_interval` works

Even when the database doesn't notify workers regarding newly deferred jobs, idle
workers still poll the database every now and then, just in case.
Even when the database doesn't notify workers regarding newly deferred jobs, each worker still poll the database every now and then, just in case.
There could be previously locked jobs that are now free, or scheduled jobs that have
reached the ETA. `polling_interval` is the {py:meth}`App.run_worker` parameter (or the
equivalent CLI flag) that sizes this "every now and then".

A worker will keep fetching new jobs as long as they have capacity to process them.
The polling interval starts from the moment the last attempt to fetch a new job yields no result.

The `polling_interval` also defines how often the worker will poll the database for jobs to abort.
When `listen_notify=True`, the worker will likely be notified "instantly" of each abort request prior to polling the database.

However, in the event `listen_notify=False` or if the abort notification was missed, `polling_interval` will represent the maximum delay before the worker reacts to an abort request.

Note that the worker will not poll the database for jobs to be aborted if it is idle (i.e. it has no running job).

:::{note}
The polling interval was previously called `timeout` in pre-v3 versions of Procrastinate. It was renamed to `polling_interval` for clarity.
:::
Expand Down
29 changes: 7 additions & 22 deletions docs/howto/advanced/cancellation.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ app.job_manager.cancel_job_by_id(33, delete_job=True)
await app.job_manager.cancel_job_by_id_async(33, delete_job=True)
```

## Mark a currently being processed job for abortion
## Mark a running job for abortion

If a worker has not picked up the job yet, the below command behaves like the
command without the `abort` option. But if a job is already in the middle of
being processed, the `abort` option marks this job for abortion (see below
command without the `abort` option. But if a job is already running, the `abort` option marks this job for abortion (see below
how to handle this request).

```python
Expand All @@ -38,10 +37,10 @@ app.job_manager.cancel_job_by_id(33, abort=True)
await app.job_manager.cancel_job_by_id_async(33, abort=True)
```

## Handle a abortion request inside the task
## Handle an abortion request inside the task

In our task, we can check (for example, periodically) if the task should be
aborted. If we want to respect that request (we don't have to), we raise a
aborted. If we want to respect that abortion request (we don't have to), we raise a
`JobAborted` error. Any message passed to `JobAborted` (e.g.
`raise JobAborted("custom message")`) will end up in the logs.

Expand All @@ -54,24 +53,10 @@ def my_task(context):
do_something_expensive()
```

There is also an async API
Behind the scenes, the worker receives a Postgres notification every time a job is requested to abort, (unless `listen_notify=False`).

```python
@app.task(pass_context=True)
async def my_task(context):
for i in range(100):
if await context.should_abort_async():
raise exceptions.JobAborted
do_something_expensive()
```

:::{warning}
`context.should_abort()` and `context.should_abort_async()` does poll the
database and might flood the database. Ensure you do it only sometimes and
not from too many parallel tasks.
:::
The worker also polls (respecting `polling_interval`) the database for abortion requests, as long as the worker is running at least one job (in the absence of running job, there is nothing to abort).

:::{note}
When a task of a job that was requested to be aborted raises an error, the job
is marked as failed (regardless of the retry strategy).
When a job is requested to abort and that job fails, it will not be retried (regardless of the retry strategy).
:::
31 changes: 21 additions & 10 deletions procrastinate/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,22 +270,33 @@ async def run_worker_async(self, **kwargs: Unpack[WorkerOptions]) -> None:
Name of the worker. Will be passed in the `JobContext` and used in the
logs (defaults to ``None`` which will result in the worker named
``worker``).
polling_interval: ``float``
Indicates the maximum duration (in seconds) the worker waits between
each database job poll. Raising this parameter can lower the rate at which
the worker makes queries to the database for requesting jobs.
polling_interval : ``float``
Maximum time (in seconds) between database job polls.
Controls the frequency of database queries for:
- Checking for new jobs to start
- Fetching updates for running jobs
- Checking for abort requests
When `listen_notify` is True, the polling interval acts as a fallback
mechanism and can reasonably be set to a higher value.
(defaults to 5.0)
shutdown_timeout: ``float``
Indicates the maximum duration (in seconds) the worker waits for jobs to
complete when requested stop. Jobs that have not been completed by that time
are aborted. A value of None corresponds to no timeout.
(defaults to None)
listen_notify: ``bool``
If ``True``, the worker will dedicate a connection from the pool to
listening to database events, notifying of newly available jobs.
If ``False``, the worker will just poll the database periodically
(see ``polling_interval``). (defaults to ``True``)
delete_jobs: ``str``
listen_notify : ``bool``
If ``True``, allocates a connection from the pool to
listen for:
- new job availability
- job abort requests
Provides lower latency for job updates compared to polling alone.
Note: Worker polls the database regardless of this setting. (defaults to ``True``)
delete_jobs : ``str``
If ``always``, the worker will automatically delete all jobs on completion.
If ``successful`` the worker will only delete successful jobs.
If ``never``, the worker will keep the jobs in the database.
Expand Down
13 changes: 9 additions & 4 deletions procrastinate/connector.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import asyncio
from typing import Any, Callable, Iterable
from typing import Any, Awaitable, Callable, Iterable, Protocol

from typing_extensions import LiteralString

Expand All @@ -13,6 +12,10 @@
LISTEN_TIMEOUT = 30.0


class Notify(Protocol):
def __call__(self, *, channel: str, payload: str) -> Awaitable[None]: ...


class BaseConnector:
json_dumps: Callable | None = None
json_loads: Callable | None = None
Expand Down Expand Up @@ -59,7 +62,9 @@ async def execute_query_all_async(
raise exceptions.SyncConnectorConfigurationError

async def listen_notify(
self, event: asyncio.Event, channels: Iterable[str]
self,
on_notification: Notify,
channels: Iterable[str],
) -> None:
raise exceptions.SyncConnectorConfigurationError

Expand Down Expand Up @@ -98,6 +103,6 @@ def execute_query_all(
return utils.async_to_sync(self.execute_query_all_async, query, **arguments)

async def listen_notify(
self, event: asyncio.Event, channels: Iterable[str]
self, on_notification: Notify, channels: Iterable[str]
) -> None:
raise NotImplementedError
19 changes: 11 additions & 8 deletions procrastinate/contrib/aiopg/aiopg_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def _make_dynamic_query(self, query: str, **identifiers: str) -> Any:

@wrap_exceptions()
async def listen_notify(
self, event: asyncio.Event, channels: Iterable[str]
self, on_notification: connector.Notify, channels: Iterable[str]
) -> None:
# We need to acquire a dedicated connection, and use the listen
# query
Expand All @@ -304,14 +304,14 @@ async def listen_notify(
query=sql.queries["listen_queue"], channel_name=channel_name
),
)
# Initial set() lets caller know that we're ready to listen
event.set()
await self._loop_notify(event=event, connection=connection)
await self._loop_notify(
on_notification=on_notification, connection=connection
)

@wrap_exceptions()
async def _loop_notify(
self,
event: asyncio.Event,
on_notification: connector.Notify,
connection: aiopg.Connection,
timeout: float = connector.LISTEN_TIMEOUT,
) -> None:
Expand All @@ -324,12 +324,15 @@ async def _loop_notify(
if connection.closed:
return
try:
await asyncio.wait_for(connection.notifies.get(), timeout)
notification = await asyncio.wait_for(
connection.notifies.get(), timeout
)
await on_notification(
channel=notification.channel, payload=notification.payload
)
except asyncio.TimeoutError:
continue
except psycopg2.Error:
# aiopg>=1.3.1 will raise if the connection is closed while
# we wait
continue

event.set()
3 changes: 1 addition & 2 deletions procrastinate/contrib/django/django_connector.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import asyncio
import contextlib
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -141,7 +140,7 @@ def execute_query_all(
return list(self._dictfetch(cursor))

async def listen_notify(
self, event: asyncio.Event, channels: Iterable[str]
self, on_notification: connector.Notify, channels: Iterable[str]
) -> None:
raise NotImplementedError(
"listen/notify is not supported with Django connector"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import annotations

from django.db import migrations

from .. import migrations_utils


class Migration(migrations.Migration):
operations = [
migrations_utils.RunProcrastinateSQL(
name="03.00.00_01_cancel_notification.sql"
),
]
name = "0032_cancel_notification"
dependencies = [("procrastinate", "0031_add_abort_on_procrastinate_jobs")]
1 change: 1 addition & 0 deletions procrastinate/contrib/django/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def procrastinate_job(self) -> jobs.Job:
status=self.status,
scheduled_at=self.scheduled_at,
attempts=self.attempts,
abort_requested=self.abort_requested,
queueing_lock=self.queueing_lock,
)

Expand Down
14 changes: 3 additions & 11 deletions procrastinate/job_context.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import time
from typing import Any, Iterable
from typing import Any, Callable, Iterable

import attr

Expand Down Expand Up @@ -54,6 +54,8 @@ class JobContext:
additional_context: dict = attr.ib(factory=dict)
task_result: Any = None

should_abort: Callable[[], bool]

def evolve(self, **update: Any) -> JobContext:
return attr.evolve(self, **update)

Expand All @@ -68,13 +70,3 @@ def job_description(self, current_timestamp: float) -> str:
message += f" (started {duration:.3f} s ago)"

return message

def should_abort(self) -> bool:
assert self.job.id
job_id = self.job.id
return self.app.job_manager.get_job_abort_requested(job_id)

async def should_abort_async(self) -> bool:
assert self.job.id
job_id = self.job.id
return await self.app.job_manager.get_job_abort_requested_async(job_id)
20 changes: 19 additions & 1 deletion procrastinate/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import functools
import logging
from enum import Enum
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, TypedDict, Union

import attr
from typing_extensions import Literal

from procrastinate import types

Expand All @@ -22,6 +23,19 @@
cached_property = getattr(functools, "cached_property", property)


class JobInserted(TypedDict):
type: Literal["job_inserted"]
job_id: int


class AbortJobRequested(TypedDict):
type: Literal["abort_job_requested"]
job_id: int


Notification = Union[JobInserted, AbortJobRequested]


def check_aware(
instance: Job, attribute: attr.Attribute, value: datetime.datetime
) -> None:
Expand Down Expand Up @@ -82,6 +96,9 @@ class Job:
#: Number of times the job has been tried.
attempts: int = 0

# True if the job is requested to abort
abort_requested: bool = False

@classmethod
def from_row(cls, row: dict[str, Any]) -> Job:
return cls(
Expand All @@ -95,6 +112,7 @@ def from_row(cls, row: dict[str, Any]) -> Job:
scheduled_at=row["scheduled_at"],
queue=row["queue_name"],
attempts=row["attempts"],
abort_requested=row.get("abort_requested", False),
)

def asdict(self) -> dict[str, Any]:
Expand Down
Loading

0 comments on commit 1511f31

Please sign in to comment.