diff --git a/CHANGELOG.md b/CHANGELOG.md index c0ff73e..0630deb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,36 @@ Changelog ========= +0.25.0 +------ + +🚨 This release requires that you run migrations, as it adds a new queue +setting, `eager_polling`. Call `chancy.migrate()` or use the CLI: + +```bash +chancy --app misc migrate +``` + +✨ Improvements + +- `Chancy.wait_for_job()` now accepts a list of states to wait for, so you can + use it to wait for a job to begin running, retried, etc... instead of just + finished. +- `Reference()` objects are now hashable and have equality. +- `Chancy.wait_for_jobs()` added to wait for multiple jobs to finish. +- `Chancy.get_jobs()` added to fetch multiple jobs by ID in a single query. +- Added the `eager_polling` option to `Queue` to trigger immediate polling + whenever an executor slot becomes free. This is useful for low-latency + processing of jobs with low concurrency, but may increase database load (#48) + +🐛 Fixes + +- Fixed a bug where a job with a unique_key could run multiple times if + pushed again after the job had started running (#51), reported by @mrsshr. +- Remove an unused plugin hook `on_worker_started`, which is available via + the `worker.started` event anyway (#47) by @PaulM5406. + + 0.24.3 ------ diff --git a/chancy/app.py b/chancy/app.py index deb588c..94e7f19 100644 --- a/chancy/app.py +++ b/chancy/app.py @@ -454,6 +454,7 @@ async def declare_ex( "rate_limit": queue.rate_limit, "rate_limit_window": queue.rate_limit_window, "resume_at": queue.resume_at, + "eager_polling": queue.eager_polling, }, ) @@ -494,6 +495,7 @@ def sync_declare_ex( "rate_limit": queue.rate_limit, "rate_limit_window": queue.rate_limit_window, "resume_at": queue.resume_at, + "eager_polling": queue.eager_polling, }, ) @@ -739,6 +741,31 @@ async def get_job(self, ref: Reference) -> QueuedJob | None: return None return QueuedJob.unpack(record) + @_ensure_pool_is_open + async def get_jobs(self, refs: list[Reference]) -> list[QueuedJob]: + """ + Resolve multiple references to job instances. + + If a job no longer exists, it will be skipped and not included in + the returned list. + + :param refs: The references to the jobs to retrieve. + :return: A list of the resolved jobs. + """ + if not refs: + return [] + + async with self.pool.connection() as conn: + async with conn.cursor(row_factory=dict_row) as cursor: + await cursor.execute( + self._get_jobs_sql(), [[ref.identifier for ref in refs]] + ) + return [ + QueuedJob.unpack(record) + async for record in cursor + if record is not None + ] + @_ensure_sync_pool_is_open def sync_get_job(self, ref: Reference) -> QueuedJob | None: """ @@ -762,6 +789,7 @@ async def wait_for_job( *, interval: int = 1, timeout: float | int | None = None, + states: set[QueuedJob.State] | None = None, ) -> QueuedJob | None: """ Wait for a job to complete. @@ -777,17 +805,63 @@ async def wait_for_job( :param interval: The number of seconds to wait between checks. :param timeout: The maximum number of seconds to wait for the job to complete. If not provided, the method will wait indefinitely. + :param states: A set of additional states to consider as "complete". If + the job enters any of these states, it will be considered complete + and the method will return. By default, only the SUCCEEDED and + FAILED states are considered complete. """ + states = states or {QueuedJob.State.SUCCEEDED, QueuedJob.State.FAILED} async with asyncio.timeout(timeout): while True: job = await self.get_job(ref) - if job is None or job.state in { - QueuedJob.State.SUCCEEDED, - QueuedJob.State.FAILED, - }: + if job is None or job.state in states: return job await asyncio.sleep(interval) + async def wait_for_jobs( + self, + refs: list[Reference], + *, + interval: int = 1, + timeout: float | int | None = None, + states: set[QueuedJob.State] | None = None, + ): + """ + Wait for multiple jobs to complete. + + This method will loop until all jobs referenced by the provided + references have completed. The interval parameter controls how often + the job status is checked. This will not block the event loop, so + other tasks can run while waiting for the jobs to complete. + + If a job no longer exists, it will be skipped and not included in + the returned list. + + :param refs: The references to the jobs to wait for. + :param interval: The number of seconds to wait between checks. + :param timeout: The maximum number of seconds to wait for the jobs to + complete. If not provided, the method will wait indefinitely. + :param states: A set of additional states to consider as "complete". If + a job enters any of these states, it will be considered complete + and removed from the list of jobs to wait for. By default, only + the SUCCEEDED and FAILED states are considered complete. + :return: A list of the completed jobs. Jobs that no longer exist will + not be included in the list. + """ + states = states or {QueuedJob.State.SUCCEEDED, QueuedJob.State.FAILED} + async with asyncio.timeout(timeout): + pending = set(refs) + completed = [] + while pending: + jobs = await self.get_jobs(list(pending)) + for job in jobs: + if job is None or job.state in states: + completed.append(job) + pending.remove(Reference(job.id)) + if pending: + await asyncio.sleep(interval) + return completed + @_ensure_pool_is_open async def get_all_queues(self) -> list[Queue]: """ @@ -1075,7 +1149,7 @@ def _push_job_sql(self): AND state NOT IN ('succeeded', 'failed') DO UPDATE SET - state = EXCLUDED.state + state = {jobs}.state RETURNING id; """ ).format(jobs=sql.Identifier(f"{self.prefix}jobs")) @@ -1092,6 +1166,18 @@ def _get_job_sql(self): """ ).format(jobs=sql.Identifier(f"{self.prefix}jobs")) + def _get_jobs_sql(self): + return sql.SQL( + """ + SELECT + * + FROM + {jobs} + WHERE + id = ANY(%s) + """ + ).format(jobs=sql.Identifier(f"{self.prefix}jobs")) + def _declare_sql(self, upsert: bool): action = sql.SQL( """ @@ -1111,7 +1197,8 @@ def _declare_sql(self, upsert: bool): polling_interval = EXCLUDED.polling_interval, rate_limit = EXCLUDED.rate_limit, rate_limit_window = EXCLUDED.rate_limit_window, - resume_at = EXCLUDED.resume_at + resume_at = EXCLUDED.resume_at, + eager_polling = EXCLUDED.eager_polling """ ) @@ -1127,7 +1214,8 @@ def _declare_sql(self, upsert: bool): polling_interval, rate_limit, rate_limit_window, - resume_at + resume_at, + eager_polling ) VALUES ( %(name)s, %(state)s, @@ -1138,7 +1226,8 @@ def _declare_sql(self, upsert: bool): %(polling_interval)s, %(rate_limit)s, %(rate_limit_window)s, - %(resume_at)s + %(resume_at)s, + %(eager_polling)s ) ON CONFLICT (name) DO {action} @@ -1151,7 +1240,8 @@ def _declare_sql(self, upsert: bool): executor_options, rate_limit, rate_limit_window, - resume_at + resume_at, + eager_polling """ ).format( queues=sql.Identifier(f"{self.prefix}queues"), diff --git a/chancy/executors/base.py b/chancy/executors/base.py index 17e30e2..325ed89 100644 --- a/chancy/executors/base.py +++ b/chancy/executors/base.py @@ -126,6 +126,7 @@ async def on_job_completed( continue await self.worker.queue_update(new_instance) + await self.worker.on_job_completed(queue=self.queue, job=new_instance) @staticmethod def get_function_and_kwargs(job: QueuedJob) -> tuple[Callable, dict]: diff --git a/chancy/job.py b/chancy/job.py index 8f02688..8d34819 100644 --- a/chancy/job.py +++ b/chancy/job.py @@ -9,7 +9,9 @@ ParamSpec, Callable, Protocol, + Union, ) +from uuid import UUID from chancy.utils import importable_name @@ -51,12 +53,28 @@ class Reference: __slots__ = ("identifier",) - def __init__(self, identifier: str): - self.identifier = identifier + def __init__(self, identifier: str | UUID): + if isinstance(identifier, UUID): + self.identifier = identifier + else: + self.identifier = UUID(identifier) def __repr__(self): return f"" + def __eq__(self, other: Union[str, UUID, "Reference"]) -> bool: + if isinstance(other, Reference): + return self.identifier == other.identifier + elif isinstance(other, str): + return str(self.identifier) == other + elif isinstance(other, UUID): + return self.identifier == other + else: + return super().__eq__(other) + + def __hash__(self) -> int: + return hash(self.identifier) + @dataclasses.dataclass class Limit: diff --git a/chancy/migrations/v6.py b/chancy/migrations/v6.py new file mode 100644 index 0000000..a3dc71c --- /dev/null +++ b/chancy/migrations/v6.py @@ -0,0 +1,29 @@ +from psycopg import sql + +from chancy.migrate import Migration + + +class QueueEagerPolling(Migration): + """ + Adds the eager polling field to queues. + """ + + async def up(self, migrator, cursor): + await cursor.execute( + sql.SQL( + """ + ALTER TABLE {queues} + ADD COLUMN IF NOT EXISTS eager_polling BOOLEAN NOT NULL DEFAULT FALSE + """ + ).format(queues=sql.Identifier(f"{migrator.prefix}queues")) + ) + + async def down(self, migrator, cursor): + await cursor.execute( + sql.SQL( + """ + ALTER TABLE {queues} + DROP COLUMN IF EXISTS eager_polling + """ + ).format(queues=sql.Identifier(f"{migrator.prefix}queues")) + ) diff --git a/chancy/plugin.py b/chancy/plugin.py index baf6229..c8d3707 100644 --- a/chancy/plugin.py +++ b/chancy/plugin.py @@ -130,11 +130,6 @@ def api_plugin(self) -> str | None: string for the plugin. """ - async def on_worker_started(self, worker: "Worker"): - """ - Called when the worker has started. - """ - async def on_job_starting( self, *, diff --git a/chancy/queue.py b/chancy/queue.py index 11238bc..3b1993b 100644 --- a/chancy/queue.py +++ b/chancy/queue.py @@ -92,6 +92,12 @@ class State(enum.Enum): #: active state. This can be used to implement a "pause for X seconds" #: feature for circuit breakers and such. resume_at: datetime.datetime | None = None + #: If set, the worker will immediately poll for new jobs after a job + #: completes, rather than waiting for the polling interval to elapse. This + #: can be useful for low-concurrency queues where jobs are expected to be + #: continuously available, as it can reduce latency between jobs. However, + #: it can also increase load on the database and should be used with care. + eager_polling: bool = False @classmethod def unpack(cls, data: dict) -> "Queue": @@ -109,6 +115,7 @@ def unpack(cls, data: dict) -> "Queue": rate_limit=data.get("rate_limit"), rate_limit_window=data.get("rate_limit_window"), resume_at=data.get("resume_at"), + eager_polling=data.get("eager_polling", False), ) def pack(self) -> dict: @@ -127,4 +134,5 @@ def pack(self) -> dict: "rate_limit": self.rate_limit, "rate_limit_window": self.rate_limit_window, "resume_at": self.resume_at, + "eager_polling": self.eager_polling, } diff --git a/chancy/worker.py b/chancy/worker.py index 5677d8f..5634250 100644 --- a/chancy/worker.py +++ b/chancy/worker.py @@ -275,39 +275,15 @@ async def _maintain_queues(self): """ while True: self.chancy.log.debug("Polling for queue changes.") - async with self.chancy.pool.connection() as conn: - async with conn.cursor(row_factory=dict_row) as cursor: - tags = self.worker_tags() - await cursor.execute( - sql.SQL( - """ - SELECT - name, - concurrency, - tags, - state, - executor, - executor_options, - polling_interval, - rate_limit, - rate_limit_window, - resume_at - FROM {queues} - """ - ).format( - queues=sql.Identifier(f"{self.chancy.prefix}queues") - ), - ) - # Tags in the database is a list of regexes, while the tags - # in the worker are a set of strings. We need to filter the - # queues based on the worker's tags. - db_queues = { - q["name"]: Queue.unpack(q) - for q in await cursor.fetchall() - if any( - re.match(t, tag) for tag in tags for t in q["tags"] - ) - } + tags = self.worker_tags() + # Tags in the database is a list of regexes, while the tags + # in the worker are a set of strings. We need to filter the + # queues based on the worker's tags. + db_queues = { + q.name: q + for q in await self.chancy.get_all_queues() + if any(re.match(t, tag) for tag in tags for t in q.tags) + } for queue_name in list(self._queues.keys()): if queue_name not in db_queues: @@ -451,12 +427,6 @@ async def _maintain_queue(self, queue: Queue): f" queue {job.queue!r}" ) await executor.push(job) - - # If we pulled exactly the maximum number of jobs, - # there's likely more jobs available, so we set the - # event to skip the next sleep. - if len(jobs) == maximum_jobs_to_poll: - self.queue_wake_events[queue.name].set() finally: self._executors.pop(queue.name, None) @@ -822,6 +792,16 @@ async def on_signal(self, signum: int): self.shutdown_event.set() await self.manager.cancel_all() + async def on_job_completed(self, *, queue: Queue, job: QueuedJob): + """ + Called when a job is completed by an executor. + + If the queue is configured for eager polling, this will set the queue's + wake event to immediately poll for more jobs. + """ + if queue.eager_polling and queue.name in self._queues: + self.queue_wake_events[queue.name].set() + async def stop(self) -> bool: """ Stop the worker. diff --git a/pyproject.toml b/pyproject.toml index 4e426fb..6bd8c70 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "chancy" -version = "0.24.3" +version = "0.25.0" description = "A simple and flexible job queue for Python" readme = "README.md" requires-python = ">=3.11" diff --git a/tests/contrib/django/test_models.py b/tests/contrib/django/test_models.py index b5c9b45..c721f5b 100644 --- a/tests/contrib/django/test_models.py +++ b/tests/contrib/django/test_models.py @@ -24,7 +24,7 @@ async def test_chancy_job_django_query(chancy, worker): orm_job = await Job.objects.aget(id=j.identifier) - assert orm_job.id == j.identifier + assert orm_job.id == j assert orm_job.queue == "default" assert orm_job.func.endswith("test_job") diff --git a/tests/regressions/test_48.py b/tests/regressions/test_48.py new file mode 100644 index 0000000..d3b7490 --- /dev/null +++ b/tests/regressions/test_48.py @@ -0,0 +1,71 @@ +import asyncio + +import pytest + +from chancy import Chancy, Worker, Queue, job + + +@job() +async def quick_job(): + await asyncio.sleep(1) + + +@pytest.mark.asyncio +async def test_regression_48(chancy: Chancy, worker: Worker): + """ + When queues are configured with `concurrency=1`, it will always wait for + `polling_interval` before fetching additional jobs, even if it's likely + that there are more jobs to process. + """ + await chancy.declare( + Queue( + "normal", + concurrency=1, + polling_interval=5, + executor=Chancy.Executor.Async, + ) + ) + + all_refs = [ + ref + async for refs in chancy.push_many( + [quick_job.job.with_queue("normal")] * 5 + ) + for ref in refs + ] + + # This will take 25 seconds, as the worker will process one job, then + # wait 5 seconds before fetching the next job. + with pytest.raises(asyncio.TimeoutError): + await chancy.wait_for_jobs(all_refs, timeout=10) + + +@pytest.mark.asyncio +async def test_regression_48_eager(chancy: Chancy, worker: Worker): + """ + When queues are configured with `concurrency=1`, it will always wait for + `polling_interval` before fetching additional jobs, even if it's likely + that there are more jobs to process. + """ + + await chancy.declare( + Queue( + "eager", + concurrency=1, + polling_interval=5, + executor=Chancy.Executor.Async, + eager_polling=True, + ) + ) + + all_refs = [ + ref + async for refs in chancy.push_many( + [quick_job.job.with_queue("eager")] * 5 + ) + for ref in refs + ] + + # This should take roughly 5 seconds, as the worker will process one job, + # then immediately fetch the next job. + await chancy.wait_for_jobs(all_refs, timeout=10) diff --git a/tests/regressions/test_51.py b/tests/regressions/test_51.py new file mode 100644 index 0000000..92c13cc --- /dev/null +++ b/tests/regressions/test_51.py @@ -0,0 +1,59 @@ +import asyncio +from typing import Any + +import pytest + +from chancy import Chancy, Worker, Queue, QueuedJob, job +from chancy.plugin import Plugin + + +class RunCounter(Plugin): + @staticmethod + def get_identifier() -> str: + return "run_counter" + + def __init__(self): + super().__init__() + self.runs = 0 + + def on_job_completed( + self, + *, + worker: "Worker", + job: QueuedJob, + exc: Exception | None = None, + result: Any | None = None, + ) -> QueuedJob: + self.runs += 1 + return job + + +@job() +async def long_running_job(): + await asyncio.sleep(5) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("chancy", [{"plugins": [RunCounter()]}], indirect=True) +async def test_regression_51(chancy: Chancy, worker: Worker): + """ + Test that pushing a job with a unique key while the same job is running + does not result in concurrent execution of the job. + + Uses a shim plugin to count the # of jobs actually run, since the issue + causes ID re-use, we can't rely on job IDs to count runs. + """ + await chancy.declare(Queue("default")) + # If working properly, ref_one and ref_two should be the same. However, if + # we wait until ref_one is actively running, then ref_two will be allowed + # to be pushed again, and we'll get a new reference ID. + await chancy.push( + long_running_job.job.with_unique_key("unique_long_running_job") + ) + await asyncio.sleep(1) + await chancy.push( + long_running_job.job.with_unique_key("unique_long_running_job") + ) + await asyncio.sleep(10) + + assert chancy.plugins[RunCounter.get_identifier()].runs == 1