Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX test_no_crash_max_workers #399

Merged
merged 6 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
attempt to use more than 61 workers on that platform (or 60 depending on the
Python version). (#390).

- Fix loky compat with python 3.11 for nested calls. (#394).

- Adapt the cooldown strategy when shutingdown an executor with full
``call_queue``. This should accelerate the time taken to shutdown
in general, in particular on overloaded machines. (#399).

### 3.3.0 - 2022-09-15

- Fix worker management logic in `get_reusable_executor` to ensure
Expand Down
6 changes: 4 additions & 2 deletions loky/process_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,17 +867,19 @@ def shutdown_workers(self):
self.call_queue.put_nowait(None)
n_sentinels_sent += 1
except queue.Full as e:
if cooldown_time > 10.0:
if cooldown_time > 5.0:
mp.util.info(
"failed to send all sentinels and exit with error."
f"\ncall_queue size={self.call_queue._maxsize}; "
f" full is {self.call_queue.full()}; "
)
raise e
mp.util.info(
"full call_queue prevented to send all sentinels at "
"once, waiting..."
)
sleep(cooldown_time)
cooldown_time *= 2
cooldown_time *= 1.2
break

mp.util.debug(f"sent {n_sentinels_sent} sentinels to the call queue")
Expand Down
10 changes: 8 additions & 2 deletions tests/test_reusable_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1020,13 +1020,19 @@ def test_no_crash_max_workers_on_windows():
# user asks for more workers than the maximum number of workers supported
# by the platform.

# To make sure we have the proper size for the call_queue, we
# shutdown the existing executor.
# See https://github.com/joblib/loky#396
get_reusable_executor().shutdown()

# Note: on overloaded CI hosts, spawning many processes can take a long
# time. We need to increase the timeout to avoid spurious failures when
# making assertions on `len(executor._processes)`.
idle_worker_timeout = 10 * 60
idle_worker_timeout = 5 * 60
with warnings.catch_warnings(record=True) as record:
executor = get_reusable_executor(
max_workers=_MAX_WINDOWS_WORKERS + 1, timeout=idle_worker_timeout
max_workers=_MAX_WINDOWS_WORKERS + 1,
timeout=idle_worker_timeout,
)
assert executor.submit(lambda: None).result() is None
if sys.platform == "win32":
Expand Down