Skip to content
Merged
30 changes: 30 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,36 @@
Changelog
=========

0.25.0
------

🚨 This release requires that you run migrations, as it adds a new queue
setting, `eager_polling`. Call `chancy.migrate()` or use the CLI:

```bash
chancy --app <your app> misc migrate
```

✨ Improvements

- `Chancy.wait_for_job()` now accepts a list of states to wait for, so you can
use it to wait for a job to begin running, retried, etc... instead of just
finished.
- `Reference()` objects are now hashable and have equality.
- `Chancy.wait_for_jobs()` added to wait for multiple jobs to finish.
- `Chancy.get_jobs()` added to fetch multiple jobs by ID in a single query.
- Added the `eager_polling` option to `Queue` to trigger immediate polling
whenever an executor slot becomes free. This is useful for low-latency
processing of jobs with low concurrency, but may increase database load (#48)

🐛 Fixes

- Fixed a bug where a job with a unique_key could run multiple times if
pushed again after the job had started running (#51), reported by @mrsshr.
- Remove an unused plugin hook `on_worker_started`, which is available via
the `worker.started` event anyway (#47) by @PaulM5406.


0.24.3
------

Expand Down
108 changes: 99 additions & 9 deletions chancy/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ async def declare_ex(
"rate_limit": queue.rate_limit,
"rate_limit_window": queue.rate_limit_window,
"resume_at": queue.resume_at,
"eager_polling": queue.eager_polling,
},
)

Expand Down Expand Up @@ -494,6 +495,7 @@ def sync_declare_ex(
"rate_limit": queue.rate_limit,
"rate_limit_window": queue.rate_limit_window,
"resume_at": queue.resume_at,
"eager_polling": queue.eager_polling,
},
)

Expand Down Expand Up @@ -739,6 +741,31 @@ async def get_job(self, ref: Reference) -> QueuedJob | None:
return None
return QueuedJob.unpack(record)

@_ensure_pool_is_open
async def get_jobs(self, refs: list[Reference]) -> list[QueuedJob]:
"""
Resolve multiple references to job instances.

If a job no longer exists, it will be skipped and not included in
the returned list.

:param refs: The references to the jobs to retrieve.
:return: A list of the resolved jobs.
"""
if not refs:
return []

async with self.pool.connection() as conn:
async with conn.cursor(row_factory=dict_row) as cursor:
await cursor.execute(
self._get_jobs_sql(), [[ref.identifier for ref in refs]]
)
return [
QueuedJob.unpack(record)
async for record in cursor
if record is not None
]

@_ensure_sync_pool_is_open
def sync_get_job(self, ref: Reference) -> QueuedJob | None:
"""
Expand All @@ -762,6 +789,7 @@ async def wait_for_job(
*,
interval: int = 1,
timeout: float | int | None = None,
states: set[QueuedJob.State] | None = None,
) -> QueuedJob | None:
"""
Wait for a job to complete.
Expand All @@ -777,17 +805,63 @@ async def wait_for_job(
:param interval: The number of seconds to wait between checks.
:param timeout: The maximum number of seconds to wait for the job to
complete. If not provided, the method will wait indefinitely.
:param states: A set of additional states to consider as "complete". If
the job enters any of these states, it will be considered complete
and the method will return. By default, only the SUCCEEDED and
FAILED states are considered complete.
"""
states = states or {QueuedJob.State.SUCCEEDED, QueuedJob.State.FAILED}
async with asyncio.timeout(timeout):
while True:
job = await self.get_job(ref)
if job is None or job.state in {
QueuedJob.State.SUCCEEDED,
QueuedJob.State.FAILED,
}:
if job is None or job.state in states:
return job
await asyncio.sleep(interval)

async def wait_for_jobs(
self,
refs: list[Reference],
*,
interval: int = 1,
timeout: float | int | None = None,
states: set[QueuedJob.State] | None = None,
):
"""
Wait for multiple jobs to complete.

This method will loop until all jobs referenced by the provided
references have completed. The interval parameter controls how often
the job status is checked. This will not block the event loop, so
other tasks can run while waiting for the jobs to complete.

If a job no longer exists, it will be skipped and not included in
the returned list.

:param refs: The references to the jobs to wait for.
:param interval: The number of seconds to wait between checks.
:param timeout: The maximum number of seconds to wait for the jobs to
complete. If not provided, the method will wait indefinitely.
:param states: A set of additional states to consider as "complete". If
a job enters any of these states, it will be considered complete
and removed from the list of jobs to wait for. By default, only
the SUCCEEDED and FAILED states are considered complete.
:return: A list of the completed jobs. Jobs that no longer exist will
not be included in the list.
"""
states = states or {QueuedJob.State.SUCCEEDED, QueuedJob.State.FAILED}
async with asyncio.timeout(timeout):
pending = set(refs)
completed = []
while pending:
jobs = await self.get_jobs(list(pending))
for job in jobs:
if job is None or job.state in states:
completed.append(job)
pending.remove(Reference(job.id))
if pending:
await asyncio.sleep(interval)
return completed

@_ensure_pool_is_open
async def get_all_queues(self) -> list[Queue]:
"""
Expand Down Expand Up @@ -1075,7 +1149,7 @@ def _push_job_sql(self):
AND state NOT IN ('succeeded', 'failed')
DO UPDATE
SET
state = EXCLUDED.state
state = {jobs}.state
RETURNING id;
"""
).format(jobs=sql.Identifier(f"{self.prefix}jobs"))
Expand All @@ -1092,6 +1166,18 @@ def _get_job_sql(self):
"""
).format(jobs=sql.Identifier(f"{self.prefix}jobs"))

def _get_jobs_sql(self):
return sql.SQL(
"""
SELECT
*
FROM
{jobs}
WHERE
id = ANY(%s)
"""
).format(jobs=sql.Identifier(f"{self.prefix}jobs"))

def _declare_sql(self, upsert: bool):
action = sql.SQL(
"""
Expand All @@ -1111,7 +1197,8 @@ def _declare_sql(self, upsert: bool):
polling_interval = EXCLUDED.polling_interval,
rate_limit = EXCLUDED.rate_limit,
rate_limit_window = EXCLUDED.rate_limit_window,
resume_at = EXCLUDED.resume_at
resume_at = EXCLUDED.resume_at,
eager_polling = EXCLUDED.eager_polling
"""
)

Expand All @@ -1127,7 +1214,8 @@ def _declare_sql(self, upsert: bool):
polling_interval,
rate_limit,
rate_limit_window,
resume_at
resume_at,
eager_polling
) VALUES (
%(name)s,
%(state)s,
Expand All @@ -1138,7 +1226,8 @@ def _declare_sql(self, upsert: bool):
%(polling_interval)s,
%(rate_limit)s,
%(rate_limit_window)s,
%(resume_at)s
%(resume_at)s,
%(eager_polling)s
)
ON CONFLICT (name) DO
{action}
Expand All @@ -1151,7 +1240,8 @@ def _declare_sql(self, upsert: bool):
executor_options,
rate_limit,
rate_limit_window,
resume_at
resume_at,
eager_polling
"""
).format(
queues=sql.Identifier(f"{self.prefix}queues"),
Expand Down
1 change: 1 addition & 0 deletions chancy/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ async def on_job_completed(
continue

await self.worker.queue_update(new_instance)
await self.worker.on_job_completed(queue=self.queue, job=new_instance)

@staticmethod
def get_function_and_kwargs(job: QueuedJob) -> tuple[Callable, dict]:
Expand Down
22 changes: 20 additions & 2 deletions chancy/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
ParamSpec,
Callable,
Protocol,
Union,
)
from uuid import UUID

from chancy.utils import importable_name

Expand Down Expand Up @@ -51,12 +53,28 @@ class Reference:

__slots__ = ("identifier",)

def __init__(self, identifier: str):
self.identifier = identifier
def __init__(self, identifier: str | UUID):
if isinstance(identifier, UUID):
self.identifier = identifier
else:
self.identifier = UUID(identifier)

def __repr__(self):
return f"<Reference({self.identifier!r})>"

def __eq__(self, other: Union[str, UUID, "Reference"]) -> bool:
if isinstance(other, Reference):
return self.identifier == other.identifier
elif isinstance(other, str):
return str(self.identifier) == other
elif isinstance(other, UUID):
return self.identifier == other
else:
return super().__eq__(other)

def __hash__(self) -> int:
return hash(self.identifier)


@dataclasses.dataclass
class Limit:
Expand Down
29 changes: 29 additions & 0 deletions chancy/migrations/v6.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from psycopg import sql

from chancy.migrate import Migration


class QueueEagerPolling(Migration):
"""
Adds the eager polling field to queues.
"""

async def up(self, migrator, cursor):
await cursor.execute(
sql.SQL(
"""
ALTER TABLE {queues}
ADD COLUMN IF NOT EXISTS eager_polling BOOLEAN NOT NULL DEFAULT FALSE
"""
).format(queues=sql.Identifier(f"{migrator.prefix}queues"))
)

async def down(self, migrator, cursor):
await cursor.execute(
sql.SQL(
"""
ALTER TABLE {queues}
DROP COLUMN IF EXISTS eager_polling
"""
).format(queues=sql.Identifier(f"{migrator.prefix}queues"))
)
5 changes: 0 additions & 5 deletions chancy/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,6 @@ def api_plugin(self) -> str | None:
string for the plugin.
"""

async def on_worker_started(self, worker: "Worker"):
"""
Called when the worker has started.
"""

async def on_job_starting(
self,
*,
Expand Down
8 changes: 8 additions & 0 deletions chancy/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ class State(enum.Enum):
#: active state. This can be used to implement a "pause for X seconds"
#: feature for circuit breakers and such.
resume_at: datetime.datetime | None = None
#: If set, the worker will immediately poll for new jobs after a job
#: completes, rather than waiting for the polling interval to elapse. This
#: can be useful for low-concurrency queues where jobs are expected to be
#: continuously available, as it can reduce latency between jobs. However,
#: it can also increase load on the database and should be used with care.
eager_polling: bool = False

@classmethod
def unpack(cls, data: dict) -> "Queue":
Expand All @@ -109,6 +115,7 @@ def unpack(cls, data: dict) -> "Queue":
rate_limit=data.get("rate_limit"),
rate_limit_window=data.get("rate_limit_window"),
resume_at=data.get("resume_at"),
eager_polling=data.get("eager_polling", False),
)

def pack(self) -> dict:
Expand All @@ -127,4 +134,5 @@ def pack(self) -> dict:
"rate_limit": self.rate_limit,
"rate_limit_window": self.rate_limit_window,
"resume_at": self.resume_at,
"eager_polling": self.eager_polling,
}
Loading
Loading