Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion chancy/executors/asyncex.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ async def push(self, job: QueuedJob):
job = await self.on_job_starting(job)
task = asyncio.create_task(self._job_wrapper(job))
self.jobs[task] = job
task.add_done_callback(self.jobs.pop)
task.add_done_callback(self._on_task_done)

def _on_task_done(self, task):
if self.free_slots == 0:
Copy link

Copilot AI Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition self.free_slots == 0 is checked before removing the completed job from self.jobs, so it will always be false when a job completes. The check should be self.free_slots == 1 to detect when the executor was exactly full before the job completed.

Suggested change
if self.free_slots == 0:
if self.free_slots == 1:

Copilot uses AI. Check for mistakes.
# Executor was exactly full
self.worker.queue_wake_events[self.queue.name].set()
self.jobs.pop(task)

def __len__(self):
return len(self.jobs)
Expand Down
17 changes: 5 additions & 12 deletions chancy/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,6 @@ async def _maintain_queue(self, queue: Queue):
try:
self._executors[queue.name] = executor

concurrency = queue.concurrency
if queue.concurrency is None:
concurrency = executor.get_default_concurrency()

await self.hub.emit(
"worker.queue.started",
{
Expand Down Expand Up @@ -428,8 +424,8 @@ async def _maintain_queue(self, queue: Queue):
# Otherwise, we can fetch jobs from the queue until
# the executor is full.
else:
maximum_jobs_to_poll = concurrency - len(executor)
if maximum_jobs_to_poll <= 0:
free_slots = executor.free_slots
if free_slots <= 0:
await self.hub.emit(
"worker.queue.full",
{
Expand All @@ -442,7 +438,9 @@ async def _maintain_queue(self, queue: Queue):

async with self.chancy.pool.connection() as conn:
jobs = await self.fetch_jobs(
queue, conn, up_to=maximum_jobs_to_poll
queue,
conn,
up_to=free_slots,
)

for job in jobs:
Expand All @@ -452,11 +450,6 @@ async def _maintain_queue(self, queue: Queue):
)
await executor.push(job)

# If we pulled exactly the maximum number of jobs,
# there's likely more jobs available, so we set the
# event to skip the next sleep.
if len(jobs) == maximum_jobs_to_poll:
self.queue_wake_events[queue.name].set()
finally:
self._executors.pop(queue.name, None)

Expand Down
Loading