Skip to content

Commit

Permalink
Make max number of simultaneous processes configurable
Browse files Browse the repository at this point in the history
That limit has always existed but was implicit and undocumented. It is
caused by the fact that every running process will block one worker
thread from the ThreadPoolExecutor (passed to run_in_executor()).
As we used the default executor, that limit could vary between python
versions (32 on py37 but only 8 on py38).

We now use our own ThreadPoolExecutor instead of the loop's default one.
That way we can make the max number of processes configurable and avoid
interfering with other code that uses the default executor.
  • Loading branch information
gsalgado committed Jun 13, 2020
1 parent 1ae3a43 commit d0c0d16
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 12 deletions.
2 changes: 1 addition & 1 deletion asyncio_run_in_process/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
InvalidState,
ProcessKilled,
)
from .run_in_process import ( # noqa: F401
from .main import ( # noqa: F401
open_in_process,
open_in_process_with_trio,
run_in_process,
Expand Down
1 change: 1 addition & 0 deletions asyncio_run_in_process/_child.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def run_process(runner: TEngineRunner, fd_read: int, fd_write: int) -> None:
except SystemExit as err:
code = err.args[0]
except BaseException:
logger.exception("%s raised an unexpected exception", async_fn)
code = 1
else:
code = 0
Expand Down
10 changes: 7 additions & 3 deletions asyncio_run_in_process/constants.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
# Number of seconds given to a child to reach the EXECUTING state before we yield in
# open_in_process().
# Default number of seconds given to a child to reach the EXECUTING state before we yield in
# open_in_process(). Can be overwritten by the ASYNCIO_RUN_IN_PROCESS_STARTUP_TIMEOUT
# environment variable.
STARTUP_TIMEOUT_SECONDS = 5

# The number of seconds that are given to a child process to exit after the
# parent process gets a KeyboardInterrupt/SIGINT-signal and sends a `SIGINT` to
# the child process.
SIGINT_TIMEOUT_SECONDS = 2


# The number of seconds that are givent to a child process to exit after the
# parent process gets an `asyncio.CancelledError` which results in sending a
# `SIGTERM` to the child process.
SIGTERM_TIMEOUT_SECONDS = 2

# Default maximum number of process that can be running at the same time. Can be overwritten via
# the ASYNCIO_RUN_IN_PROCESS_MAX_PROCS environment variable.
MAX_PROCESSES = 16
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import concurrent
import logging
import os
import signal
Expand Down Expand Up @@ -47,6 +48,15 @@


logger = logging.getLogger("asyncio_run_in_process")
_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None


def _get_executor() -> concurrent.futures.ThreadPoolExecutor:
global _executor
if _executor is None:
max_procs = int(os.getenv('ASYNCIO_RUN_IN_PROCESS_MAX_PROCS', constants.MAX_PROCESSES))
_executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_procs)
return _executor


async def _monitor_sub_proc(
Expand Down Expand Up @@ -119,7 +129,8 @@ async def _monitor_state(
logger.debug(
"Waiting for next expected state (%s) from child (%s)", next_expected_state, proc)
try:
child_state_as_byte = await loop.run_in_executor(None, read_exactly, from_child, 1)
child_state_as_byte = await loop.run_in_executor(
_get_executor(), read_exactly, from_child, 1)
except asyncio.CancelledError:
# When the sub process is sent a SIGKILL, the write end of the pipe used in
# read_exactly is never closed and the thread above attempting to read from it
Expand Down Expand Up @@ -152,7 +163,7 @@ async def _monitor_state(
# process id. The process ID is gotten via this mechanism to
# prevent the need for ugly sleep based code in
# `_monitor_sub_proc`.
pid_bytes = await loop.run_in_executor(None, read_exactly, from_child, 4)
pid_bytes = await loop.run_in_executor(_get_executor(), read_exactly, from_child, 4)
proc.pid = int.from_bytes(pid_bytes, 'big')

await proc.update_state(child_state)
Expand All @@ -170,7 +181,7 @@ async def _monitor_state(

logger.debug("Waiting for result from %s", proc)
try:
result = await loop.run_in_executor(None, receive_pickled_value, from_child)
result = await loop.run_in_executor(_get_executor(), receive_pickled_value, from_child)
except asyncio.CancelledError:
# See comment above as to why we need to do this.
logger.debug(
Expand Down Expand Up @@ -279,13 +290,15 @@ async def _open_in_process(
relay_signals_task = asyncio.ensure_future(_relay_signals(proc, signal_queue))
monitor_state_task = asyncio.ensure_future(_monitor_state(proc, parent_r, child_w, loop))

startup_timeout = int(
os.getenv('ASYNCIO_RUN_IN_PROCESS_STARTUP_TIMEOUT', constants.STARTUP_TIMEOUT_SECONDS))
async with cleanup_tasks(monitor_sub_proc_task, relay_signals_task, monitor_state_task):
try:
await asyncio.wait_for(proc.wait_pid(), timeout=constants.STARTUP_TIMEOUT_SECONDS)
await asyncio.wait_for(proc.wait_pid(), timeout=startup_timeout)
except asyncio.TimeoutError:
sub_proc.kill()
raise asyncio.TimeoutError(
f"{proc} took more than {constants.STARTUP_TIMEOUT_SECONDS} seconds to start up")
f"{proc} took more than {startup_timeout} seconds to start up")

logger.debug(
"Got pid %d for %s, waiting for it to reach EXECUTING state before yielding",
Expand All @@ -298,12 +311,12 @@ async def _open_in_process(
# The timeout ensures that if something is fundamentally wrong
# with the subprocess we don't hang indefinitely.
try:
await asyncio.wait_for(
proc.wait_for_state(State.EXECUTING), timeout=constants.STARTUP_TIMEOUT_SECONDS)
logger.debug("Waiting for proc pid=%d to reach EXECUTING state", proc.pid)
await asyncio.wait_for(proc.wait_for_state(State.EXECUTING), timeout=startup_timeout)
except asyncio.TimeoutError:
sub_proc.kill()
raise asyncio.TimeoutError(
f"{proc} took more than {constants.STARTUP_TIMEOUT_SECONDS} seconds to start up")
f"{proc} took more than {startup_timeout} seconds to start up")

try:
try:
Expand Down
7 changes: 7 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ run them with ``trio``, the ``run_in_process_with_trio`` and ``open_in_process_w
functions can be used.


Maximum number of running processes
-----------------------------------

By default we can only have up to ``MAX_PROCESSES`` running at any given moment, but that can
be changed via the ``ASYNCIO_RUN_IN_PROCESS_MAX_PROCS`` environment variable.


Gotchas
-------

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

extras_require = {
'test': [
"async-exit-stack==1.0.1",
"pytest==5.2.2",
"pytest-asyncio==0.10.0",
"pytest-xdist",
Expand Down
51 changes: 51 additions & 0 deletions tests/core/test_open_in_process.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import asyncio
import concurrent
import pickle
import signal

import pytest
import trio

from async_exit_stack import (
AsyncExitStack,
)
from asyncio_run_in_process import (
ProcessKilled,
constants,
main,
open_in_process,
open_in_process_with_trio,
)
Expand Down Expand Up @@ -295,3 +300,49 @@ async def do_sleep_forever():
with pytest.raises(asyncio.TimeoutError):
async with open_in_proc(do_sleep_forever):
pass


@pytest.mark.asyncio
async def test_max_processes(monkeypatch, open_in_proc):
async def do_sleep_forever():
while True:
await sleep(0.2)

max_procs = 4
executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_procs)
# We need to monkeypatch _get_executor() instead of setting the
# ASYNCIO_RUN_IN_PROCESS_MAX_PROCS environment variable because if another test runs before us
# it will cause _get_executor() to store a global executor created using the default number of
# max procs and then when it got called again here it'd reuse that executor.
monkeypatch.setattr(main, '_get_executor', lambda: executor)
monkeypatch.setenv('ASYNCIO_RUN_IN_PROCESS_STARTUP_TIMEOUT', str(1))

above_limit_proc_created = False
procs = []
async with AsyncExitStack() as stack:
for _ in range(max_procs):
proc = await stack.enter_async_context(open_in_proc(do_sleep_forever))
procs.append(proc)

for proc in procs:
assert proc.state is State.EXECUTING

try:
async with open_in_proc(do_sleep_forever) as proc:
# This should not execute as the above should raise a TimeoutError, but in case it
# doesn't happen we need to ensure the proc is terminated so we can leave the
# context and fail the test below.
proc.send_signal(signal.SIGINT)
except asyncio.TimeoutError:
pass
else:
above_limit_proc_created = True
finally:
for proc in procs:
proc.send_signal(signal.SIGINT)

# We want to fail the test only after we leave the AsyncExitStack(), or else it will pass the
# exception along when returning control to open_in_proc(), which will interpret it as a
# failure of the process to exit and send a SIGKILL (together with a warning).
if above_limit_proc_created:
assert False, "This process must not have been created successfully"

0 comments on commit d0c0d16

Please sign in to comment.