Skip to content

Commit

Permalink
Fix scheduler hang when pending limit is reached (#135)
Browse files Browse the repository at this point in the history
Co-authored-by: Sam Bull <aa6bs0@sambull.org>
  • Loading branch information
pollydrag and Dreamsorcerer authored Sep 22, 2022
1 parent 8abc7af commit 3359f20
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGES/97.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix scheduler hang when pending limit is reached.
10 changes: 7 additions & 3 deletions aiojobs/_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,16 @@ async def spawn(self, coro):
raise RuntimeError("Scheduling a new job after closing")
job = Job(coro, self)
should_start = self._limit is None or self.active_count < self._limit
self._jobs.add(job)
if should_start:
job._start()
else:
# wait for free slot in queue
await self._pending.put(job)
try:
# wait for free slot in queue
await self._pending.put(job)
except asyncio.CancelledError:
await job.close()
raise
self._jobs.add(job)
return job

async def close(self):
Expand Down
37 changes: 37 additions & 0 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,18 +263,55 @@ async def coro(fut):
fut3 = asyncio.Future()

await scheduler.spawn(coro(fut1))
assert scheduler.active_count == 1
assert scheduler.pending_count == 0

await scheduler.spawn(coro(fut2))
assert scheduler.active_count == 1
assert scheduler.pending_count == 1

with pytest.raises(asyncio.TimeoutError):
# try to wait for 1 sec to add task to pending queue
async with timeout(1):
await scheduler.spawn(coro(fut3))

assert scheduler.active_count == 1
assert scheduler.pending_count == 1


async def test_scheduler_concurrency_pending_limit(make_scheduler):
scheduler = await make_scheduler(limit=1, pending_limit=1)

async def coro(fut):
await fut

futures = [asyncio.Future() for _ in range(3)]
jobs = []

async def spawn():
for fut in futures:
jobs.append(await scheduler.spawn(coro(fut)))

task = asyncio.create_task(spawn())
await asyncio.sleep(0)

assert len(scheduler) == 2
assert scheduler.active_count == 1
assert scheduler.pending_count == 1

for fut in futures:
fut.set_result(None)

for job in jobs:
await job.wait()

await task

assert len(scheduler) == 0
assert scheduler.active_count == 0
assert scheduler.pending_count == 0
assert all(job.closed for job in jobs)


async def test_scheduler_concurrency_limit(make_scheduler):
scheduler = await make_scheduler(limit=1)
Expand Down

0 comments on commit 3359f20

Please sign in to comment.