Skip to content

Commit

Permalink
Wakeup worker when there are resources freed up (#882)
Browse files Browse the repository at this point in the history
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Alex Grönholm <alex.gronholm@nextday.fi>
  • Loading branch information
3 people authored May 9, 2024
1 parent fad16ec commit 0596db7
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 3 deletions.
3 changes: 3 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ APScheduler, see the :doc:`migration section <migration>`.
- Fixed dialect name checks in the SQLAlchemy job store
- Fixed JSON and CBOR serializers unable to serialize enums
- Fixed infinite loop in CalendarIntervalTrigger with UTC timezone (PR by unights)
- Fixed scheduler not resuming job processing when ``max_concurrent_jobs`` had been
reached and then a job was completed, thus making job processing possible again
(PR by MohammadAmin Vahedinia)

**4.0.0a4**

Expand Down
8 changes: 6 additions & 2 deletions src/apscheduler/_schedulers/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ def _get_task_callable(self, task: Task) -> Callable:
async def _process_jobs(self, *, task_status: TaskStatus) -> None:
wakeup_event = anyio.Event()

async def job_added(event: Event) -> None:
async def check_queue_capacity(event: Event) -> None:
if len(self._running_jobs) < self.max_concurrent_jobs:
wakeup_event.set()

Expand All @@ -993,7 +993,11 @@ async def job_added(event: Event) -> None:
task_group = await exit_stack.enter_async_context(create_task_group())

# Fetch new jobs every time
exit_stack.enter_context(self.event_broker.subscribe(job_added, {JobAdded}))
exit_stack.enter_context(
self.event_broker.subscribe(
check_queue_capacity, {JobAdded, JobReleased}
)
)

# Signal that we are ready, and wait for the scheduler start event
task_status.started()
Expand Down
30 changes: 29 additions & 1 deletion tests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@

import anyio
import pytest
from anyio import WouldBlock, create_memory_object_stream, fail_after, sleep
from anyio import (
Lock,
WouldBlock,
create_memory_object_stream,
fail_after,
sleep,
)
from pytest import MonkeyPatch
from pytest_mock import MockerFixture, MockFixture

Expand Down Expand Up @@ -819,6 +825,28 @@ async def test_wait_until_stopped(self) -> None:
# This should be a no-op
await scheduler.wait_until_stopped()

async def test_max_concurrent_jobs(self) -> None:
lock = Lock()
scheduler = AsyncScheduler(max_concurrent_jobs=1)
tasks_done = 0

async def acquire_release() -> None:
nonlocal tasks_done
lock.acquire_nowait()
await sleep(0.1)
tasks_done += 1
if tasks_done == 2:
await scheduler.stop()

lock.release()

with fail_after(3):
async with scheduler:
await scheduler.configure_task("dummyjob", func=acquire_release)
await scheduler.add_job("dummyjob")
await scheduler.add_job("dummyjob")
await scheduler.run_until_stopped()


class TestSyncScheduler:
def test_configure(self) -> None:
Expand Down

0 comments on commit 0596db7

Please sign in to comment.