Skip to content

Commit e7d4850

Browse files
authored
Merge pull request #49 from TkTech/24_4
v0.25.0 - General Bug Fixes
2 parents 4026904 + 95b8a9d commit e7d4850

File tree

12 files changed

+338
-57
lines changed

12 files changed

+338
-57
lines changed

CHANGELOG.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,36 @@
11
Changelog
22
=========
33

4+
0.25.0
5+
------
6+
7+
🚨 This release requires that you run migrations, as it adds a new queue
8+
setting, `eager_polling`. Call `chancy.migrate()` or use the CLI:
9+
10+
```bash
11+
chancy --app <your app> misc migrate
12+
```
13+
14+
✨ Improvements
15+
16+
- `Chancy.wait_for_job()` now accepts a list of states to wait for, so you can
17+
use it to wait for a job to begin running, retried, etc... instead of just
18+
finished.
19+
- `Reference()` objects are now hashable and have equality.
20+
- `Chancy.wait_for_jobs()` added to wait for multiple jobs to finish.
21+
- `Chancy.get_jobs()` added to fetch multiple jobs by ID in a single query.
22+
- Added the `eager_polling` option to `Queue` to trigger immediate polling
23+
whenever an executor slot becomes free. This is useful for low-latency
24+
processing of jobs with low concurrency, but may increase database load (#48)
25+
26+
🐛 Fixes
27+
28+
- Fixed a bug where a job with a unique_key could run multiple times if
29+
pushed again after the job had started running (#51), reported by @mrsshr.
30+
- Remove an unused plugin hook `on_worker_started`, which is available via
31+
the `worker.started` event anyway (#47) by @PaulM5406.
32+
33+
434
0.24.3
535
------
636

chancy/app.py

Lines changed: 99 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,7 @@ async def declare_ex(
454454
"rate_limit": queue.rate_limit,
455455
"rate_limit_window": queue.rate_limit_window,
456456
"resume_at": queue.resume_at,
457+
"eager_polling": queue.eager_polling,
457458
},
458459
)
459460

@@ -494,6 +495,7 @@ def sync_declare_ex(
494495
"rate_limit": queue.rate_limit,
495496
"rate_limit_window": queue.rate_limit_window,
496497
"resume_at": queue.resume_at,
498+
"eager_polling": queue.eager_polling,
497499
},
498500
)
499501

@@ -739,6 +741,31 @@ async def get_job(self, ref: Reference) -> QueuedJob | None:
739741
return None
740742
return QueuedJob.unpack(record)
741743

744+
@_ensure_pool_is_open
745+
async def get_jobs(self, refs: list[Reference]) -> list[QueuedJob]:
746+
"""
747+
Resolve multiple references to job instances.
748+
749+
If a job no longer exists, it will be skipped and not included in
750+
the returned list.
751+
752+
:param refs: The references to the jobs to retrieve.
753+
:return: A list of the resolved jobs.
754+
"""
755+
if not refs:
756+
return []
757+
758+
async with self.pool.connection() as conn:
759+
async with conn.cursor(row_factory=dict_row) as cursor:
760+
await cursor.execute(
761+
self._get_jobs_sql(), [[ref.identifier for ref in refs]]
762+
)
763+
return [
764+
QueuedJob.unpack(record)
765+
async for record in cursor
766+
if record is not None
767+
]
768+
742769
@_ensure_sync_pool_is_open
743770
def sync_get_job(self, ref: Reference) -> QueuedJob | None:
744771
"""
@@ -762,6 +789,7 @@ async def wait_for_job(
762789
*,
763790
interval: int = 1,
764791
timeout: float | int | None = None,
792+
states: set[QueuedJob.State] | None = None,
765793
) -> QueuedJob | None:
766794
"""
767795
Wait for a job to complete.
@@ -777,17 +805,63 @@ async def wait_for_job(
777805
:param interval: The number of seconds to wait between checks.
778806
:param timeout: The maximum number of seconds to wait for the job to
779807
complete. If not provided, the method will wait indefinitely.
808+
:param states: A set of additional states to consider as "complete". If
809+
the job enters any of these states, it will be considered complete
810+
and the method will return. By default, only the SUCCEEDED and
811+
FAILED states are considered complete.
780812
"""
813+
states = states or {QueuedJob.State.SUCCEEDED, QueuedJob.State.FAILED}
781814
async with asyncio.timeout(timeout):
782815
while True:
783816
job = await self.get_job(ref)
784-
if job is None or job.state in {
785-
QueuedJob.State.SUCCEEDED,
786-
QueuedJob.State.FAILED,
787-
}:
817+
if job is None or job.state in states:
788818
return job
789819
await asyncio.sleep(interval)
790820

821+
async def wait_for_jobs(
822+
self,
823+
refs: list[Reference],
824+
*,
825+
interval: int = 1,
826+
timeout: float | int | None = None,
827+
states: set[QueuedJob.State] | None = None,
828+
):
829+
"""
830+
Wait for multiple jobs to complete.
831+
832+
This method will loop until all jobs referenced by the provided
833+
references have completed. The interval parameter controls how often
834+
the job status is checked. This will not block the event loop, so
835+
other tasks can run while waiting for the jobs to complete.
836+
837+
If a job no longer exists, it will be skipped and not included in
838+
the returned list.
839+
840+
:param refs: The references to the jobs to wait for.
841+
:param interval: The number of seconds to wait between checks.
842+
:param timeout: The maximum number of seconds to wait for the jobs to
843+
complete. If not provided, the method will wait indefinitely.
844+
:param states: A set of additional states to consider as "complete". If
845+
a job enters any of these states, it will be considered complete
846+
and removed from the list of jobs to wait for. By default, only
847+
the SUCCEEDED and FAILED states are considered complete.
848+
:return: A list of the completed jobs. Jobs that no longer exist will
849+
not be included in the list.
850+
"""
851+
states = states or {QueuedJob.State.SUCCEEDED, QueuedJob.State.FAILED}
852+
async with asyncio.timeout(timeout):
853+
pending = set(refs)
854+
completed = []
855+
while pending:
856+
jobs = await self.get_jobs(list(pending))
857+
for job in jobs:
858+
if job is None or job.state in states:
859+
completed.append(job)
860+
pending.remove(Reference(job.id))
861+
if pending:
862+
await asyncio.sleep(interval)
863+
return completed
864+
791865
@_ensure_pool_is_open
792866
async def get_all_queues(self) -> list[Queue]:
793867
"""
@@ -1075,7 +1149,7 @@ def _push_job_sql(self):
10751149
AND state NOT IN ('succeeded', 'failed')
10761150
DO UPDATE
10771151
SET
1078-
state = EXCLUDED.state
1152+
state = {jobs}.state
10791153
RETURNING id;
10801154
"""
10811155
).format(jobs=sql.Identifier(f"{self.prefix}jobs"))
@@ -1092,6 +1166,18 @@ def _get_job_sql(self):
10921166
"""
10931167
).format(jobs=sql.Identifier(f"{self.prefix}jobs"))
10941168

1169+
def _get_jobs_sql(self):
1170+
return sql.SQL(
1171+
"""
1172+
SELECT
1173+
*
1174+
FROM
1175+
{jobs}
1176+
WHERE
1177+
id = ANY(%s)
1178+
"""
1179+
).format(jobs=sql.Identifier(f"{self.prefix}jobs"))
1180+
10951181
def _declare_sql(self, upsert: bool):
10961182
action = sql.SQL(
10971183
"""
@@ -1111,7 +1197,8 @@ def _declare_sql(self, upsert: bool):
11111197
polling_interval = EXCLUDED.polling_interval,
11121198
rate_limit = EXCLUDED.rate_limit,
11131199
rate_limit_window = EXCLUDED.rate_limit_window,
1114-
resume_at = EXCLUDED.resume_at
1200+
resume_at = EXCLUDED.resume_at,
1201+
eager_polling = EXCLUDED.eager_polling
11151202
"""
11161203
)
11171204

@@ -1127,7 +1214,8 @@ def _declare_sql(self, upsert: bool):
11271214
polling_interval,
11281215
rate_limit,
11291216
rate_limit_window,
1130-
resume_at
1217+
resume_at,
1218+
eager_polling
11311219
) VALUES (
11321220
%(name)s,
11331221
%(state)s,
@@ -1138,7 +1226,8 @@ def _declare_sql(self, upsert: bool):
11381226
%(polling_interval)s,
11391227
%(rate_limit)s,
11401228
%(rate_limit_window)s,
1141-
%(resume_at)s
1229+
%(resume_at)s,
1230+
%(eager_polling)s
11421231
)
11431232
ON CONFLICT (name) DO
11441233
{action}
@@ -1151,7 +1240,8 @@ def _declare_sql(self, upsert: bool):
11511240
executor_options,
11521241
rate_limit,
11531242
rate_limit_window,
1154-
resume_at
1243+
resume_at,
1244+
eager_polling
11551245
"""
11561246
).format(
11571247
queues=sql.Identifier(f"{self.prefix}queues"),

chancy/executors/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ async def on_job_completed(
126126
continue
127127

128128
await self.worker.queue_update(new_instance)
129+
await self.worker.on_job_completed(queue=self.queue, job=new_instance)
129130

130131
@staticmethod
131132
def get_function_and_kwargs(job: QueuedJob) -> tuple[Callable, dict]:

chancy/job.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
ParamSpec,
1010
Callable,
1111
Protocol,
12+
Union,
1213
)
14+
from uuid import UUID
1315

1416
from chancy.utils import importable_name
1517

@@ -51,12 +53,28 @@ class Reference:
5153

5254
__slots__ = ("identifier",)
5355

54-
def __init__(self, identifier: str):
55-
self.identifier = identifier
56+
def __init__(self, identifier: str | UUID):
57+
if isinstance(identifier, UUID):
58+
self.identifier = identifier
59+
else:
60+
self.identifier = UUID(identifier)
5661

5762
def __repr__(self):
5863
return f"<Reference({self.identifier!r})>"
5964

65+
def __eq__(self, other: Union[str, UUID, "Reference"]) -> bool:
66+
if isinstance(other, Reference):
67+
return self.identifier == other.identifier
68+
elif isinstance(other, str):
69+
return str(self.identifier) == other
70+
elif isinstance(other, UUID):
71+
return self.identifier == other
72+
else:
73+
return super().__eq__(other)
74+
75+
def __hash__(self) -> int:
76+
return hash(self.identifier)
77+
6078

6179
@dataclasses.dataclass
6280
class Limit:

chancy/migrations/v6.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from psycopg import sql
2+
3+
from chancy.migrate import Migration
4+
5+
6+
class QueueEagerPolling(Migration):
7+
"""
8+
Adds the eager polling field to queues.
9+
"""
10+
11+
async def up(self, migrator, cursor):
12+
await cursor.execute(
13+
sql.SQL(
14+
"""
15+
ALTER TABLE {queues}
16+
ADD COLUMN IF NOT EXISTS eager_polling BOOLEAN NOT NULL DEFAULT FALSE
17+
"""
18+
).format(queues=sql.Identifier(f"{migrator.prefix}queues"))
19+
)
20+
21+
async def down(self, migrator, cursor):
22+
await cursor.execute(
23+
sql.SQL(
24+
"""
25+
ALTER TABLE {queues}
26+
DROP COLUMN IF EXISTS eager_polling
27+
"""
28+
).format(queues=sql.Identifier(f"{migrator.prefix}queues"))
29+
)

chancy/plugin.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,6 @@ def api_plugin(self) -> str | None:
130130
string for the plugin.
131131
"""
132132

133-
async def on_worker_started(self, worker: "Worker"):
134-
"""
135-
Called when the worker has started.
136-
"""
137-
138133
async def on_job_starting(
139134
self,
140135
*,

chancy/queue.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ class State(enum.Enum):
9292
#: active state. This can be used to implement a "pause for X seconds"
9393
#: feature for circuit breakers and such.
9494
resume_at: datetime.datetime | None = None
95+
#: If set, the worker will immediately poll for new jobs after a job
96+
#: completes, rather than waiting for the polling interval to elapse. This
97+
#: can be useful for low-concurrency queues where jobs are expected to be
98+
#: continuously available, as it can reduce latency between jobs. However,
99+
#: it can also increase load on the database and should be used with care.
100+
eager_polling: bool = False
95101

96102
@classmethod
97103
def unpack(cls, data: dict) -> "Queue":
@@ -109,6 +115,7 @@ def unpack(cls, data: dict) -> "Queue":
109115
rate_limit=data.get("rate_limit"),
110116
rate_limit_window=data.get("rate_limit_window"),
111117
resume_at=data.get("resume_at"),
118+
eager_polling=data.get("eager_polling", False),
112119
)
113120

114121
def pack(self) -> dict:
@@ -127,4 +134,5 @@ def pack(self) -> dict:
127134
"rate_limit": self.rate_limit,
128135
"rate_limit_window": self.rate_limit_window,
129136
"resume_at": self.resume_at,
137+
"eager_polling": self.eager_polling,
130138
}

0 commit comments

Comments
 (0)