Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scheduler hang when pending limit is reached #135

Merged
merged 8 commits into from
Sep 22, 2022
Merged
1 change: 1 addition & 0 deletions CHANGES/97.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix scheduler hang when pending limit is reached.
10 changes: 7 additions & 3 deletions aiojobs/_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,16 @@ async def spawn(self, coro):
raise RuntimeError("Scheduling a new job after closing")
job = Job(coro, self)
should_start = self._limit is None or self.active_count < self._limit
self._jobs.add(job)
if should_start:
job._start()
else:
# wait for free slot in queue
await self._pending.put(job)
try:
# wait for free slot in queue
await self._pending.put(job)
except asyncio.CancelledError:
await job.close()
raise
self._jobs.add(job)
Dreamsorcerer marked this conversation as resolved.
Show resolved Hide resolved
return job

async def close(self):
Expand Down
35 changes: 35 additions & 0 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,19 +263,54 @@ async def coro(fut):
fut3 = asyncio.Future()

await scheduler.spawn(coro(fut1))
assert scheduler.active_count == 1
assert scheduler.pending_count == 0

await scheduler.spawn(coro(fut2))
assert scheduler.active_count == 1
assert scheduler.pending_count == 1

with pytest.raises(asyncio.TimeoutError):
# try to wait for 1 sec to add task to pending queue
async with timeout(1):
await scheduler.spawn(coro(fut3))

assert scheduler.active_count == 1
assert scheduler.pending_count == 1


async def test_scheduler_concurrency_pending_limit(make_scheduler):
scheduler = await make_scheduler(limit=1, pending_limit=1)

async def coro(fut):
await fut

futures = [asyncio.Future() for _ in range(3)]
jobs = []

async def spawn():
for fut in futures:
jobs.append(await scheduler.spawn(coro(fut)))

asyncio.create_task(spawn())
Dreamsorcerer marked this conversation as resolved.
Show resolved Hide resolved
await asyncio.sleep(0.05)
pollydrag marked this conversation as resolved.
Show resolved Hide resolved

assert len(scheduler) == 2
assert scheduler.active_count == 1
assert scheduler.pending_count == 1

for fut in futures:
fut.set_result(None)

for job in jobs:
await job.wait()

assert not len(scheduler)
pollydrag marked this conversation as resolved.
Show resolved Hide resolved
assert scheduler.active_count == 0
assert scheduler.pending_count == 0
assert all(job.closed for job in jobs)


async def test_scheduler_concurrency_limit(make_scheduler):
scheduler = await make_scheduler(limit=1)

Expand Down