From f3c68df78946012c7145d662e3a22e4c7800f814 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 24 Nov 2024 02:19:18 +0200 Subject: [PATCH] Pass pool_kwargs to the replacement process pool too --- docs/versionhistory.rst | 3 +++ src/apscheduler/executors/pool.py | 36 +++++++++++++++++-------------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index ee847e01..1b88690a 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -15,6 +15,9 @@ UNRELEASED forking on all platform - Fixed ``AsyncIOScheduler`` inadvertently creating a defunct event loop at start, leading to the scheduler not working at all +- Fixed ``ProcessPoolExecutor`` not respecting the passed keyword arguments when a + broken pool was being replaced + 3.10.4 ------ diff --git a/src/apscheduler/executors/pool.py b/src/apscheduler/executors/pool.py index 146978bb..166de7c7 100644 --- a/src/apscheduler/executors/pool.py +++ b/src/apscheduler/executors/pool.py @@ -24,19 +24,9 @@ def callback(f): else: self._run_job_success(job.id, f.result()) - try: - f = self._pool.submit( - run_job, job, job._jobstore_alias, run_times, self._logger.name - ) - except BrokenProcessPool: - self._logger.warning( - "Process pool is broken; replacing pool with a fresh instance" - ) - self._pool = self._pool.__class__(self._pool._max_workers) - f = self._pool.submit( - run_job, job, job._jobstore_alias, run_times, self._logger.name - ) - + f = self._pool.submit( + run_job, job, job._jobstore_alias, run_times, self._logger.name + ) f.add_done_callback(callback) def shutdown(self, wait=True): @@ -72,7 +62,21 @@ class ProcessPoolExecutor(BasePoolExecutor): """ def __init__(self, max_workers=10, pool_kwargs=None): - pool_kwargs = pool_kwargs or {} - pool_kwargs.setdefault("mp_context", multiprocessing.get_context("spawn")) - pool = concurrent.futures.ProcessPoolExecutor(int(max_workers), **pool_kwargs) + self.pool_kwargs = pool_kwargs or {} + self.pool_kwargs.setdefault("mp_context", multiprocessing.get_context("spawn")) + pool = concurrent.futures.ProcessPoolExecutor( + int(max_workers), **self.pool_kwargs + ) super().__init__(pool) + + def _do_submit_job(self, job, run_times): + try: + super()._do_submit_job(job, run_times) + except BrokenProcessPool: + self._logger.warning( + "Process pool is broken; replacing pool with a fresh instance" + ) + self._pool = self._pool.__class__( + self._pool._max_workers, **self.pool_kwargs + ) + super()._do_submit_job(job, run_times)