Skip to content

Commit

Permalink
Always use the spawn mp_context instead of fork in ProcessPoolExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Nov 18, 2024
1 parent e8bf143 commit 5361dbb
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 10 deletions.
2 changes: 2 additions & 0 deletions apscheduler/executors/pool.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import concurrent.futures
import multiprocessing
from abc import abstractmethod
from concurrent.futures.process import BrokenProcessPool

Expand Down Expand Up @@ -72,5 +73,6 @@ class ProcessPoolExecutor(BasePoolExecutor):

def __init__(self, max_workers=10, pool_kwargs=None):
pool_kwargs = pool_kwargs or {}
pool_kwargs.setdefault("mp_context", multiprocessing.get_context("spawn"))
pool = concurrent.futures.ProcessPoolExecutor(int(max_workers), **pool_kwargs)
super().__init__(pool)
2 changes: 2 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ UNRELEASED

- Dropped support for Python 3.6 and 3.7
- Removed the dependency on ``six``
- Changed ``ProcessPoolExecutor`` to spawn new subprocesses from scratch instead of
forking on all platform


3.10.4
Expand Down
18 changes: 8 additions & 10 deletions tests/test_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,16 @@ def success():
return 5


def test_max_instances(mock_scheduler, executor, create_job, freeze_time):
def test_max_instances(mock_scheduler, executor, create_job, timezone):
"""Tests that the maximum instance limit on a job is respected."""
events = []
mock_scheduler._dispatch_event = lambda event: events.append(event)
now = datetime.now(timezone)
job = create_job(func=wait_event, max_instances=2, next_run_time=None)
executor.submit_job(job, [freeze_time.current])
executor.submit_job(job, [freeze_time.current])
executor.submit_job(job, [now])
executor.submit_job(job, [now])

pytest.raises(
MaxInstancesReachedError, executor.submit_job, job, [freeze_time.current]
)
pytest.raises(MaxInstancesReachedError, executor.submit_job, job, [now])
executor.shutdown()
assert len(events) == 2
assert events[0].retval == "test"
Expand All @@ -130,9 +129,7 @@ def test_max_instances(mock_scheduler, executor, create_job, freeze_time):
],
ids=["executed", "missed", "error"],
)
def test_submit_job(
mock_scheduler, executor, create_job, freeze_time, timezone, event_code, func
):
def test_submit_job(mock_scheduler, executor, create_job, timezone, event_code, func):
"""
Tests that an EVENT_JOB_EXECUTED event is delivered to the scheduler if the job was
successfully executed.
Expand All @@ -141,10 +138,11 @@ def test_submit_job(
mock_scheduler._dispatch_event = MagicMock()
job = create_job(func=func, id="foo")
job._jobstore_alias = "test_jobstore"
now = datetime.now(timezone)
run_time = (
timezone.localize(datetime(1970, 1, 1))
if event_code == EVENT_JOB_MISSED
else freeze_time.current
else now
)
executor.submit_job(job, [run_time])
executor.shutdown()
Expand Down

0 comments on commit 5361dbb

Please sign in to comment.