Skip to content

Commit

Permalink
Added support for subprocesses
Browse files Browse the repository at this point in the history
Fixes nedbat#9.
  • Loading branch information
agronholm committed Aug 2, 2020
1 parent a1c7538 commit 5847c61
Show file tree
Hide file tree
Showing 11 changed files with 578 additions and 3 deletions.
9 changes: 9 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ Sockets and networking
.. autoclass:: anyio.abc.sockets.ConnectedUDPSocket
:members:

Subprocesses
------------

.. autofunction:: anyio.run_process
.. autofunction:: anyio.open_process

.. autoclass:: anyio.abc.subprocesses.Process
:members:

Synchronization
---------------

Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The manual
streams
networking
threads
subprocesses
fileio
signals
testing
Expand Down
58 changes: 58 additions & 0 deletions docs/subprocesses.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
Using subprocesses
==================

AnyIO allows you to run arbitrary executables in subprocesses, either as a one-shot call or by
opening a process handle for you that gives you more control over the subprocess.

You can either give the command as a string, in which case it is passed to your default shell
(equivalent to ``shell=True`` in :func:`subprocess.run`), or as a sequence of strings
(``shell=False``) in which the executable is the first item in the sequence and the rest are
arguments passed to it.

.. note:: The subprocess facilities provided by AnyIO do not include a way to execute arbitrary
Python code like the :mod:`multiprocessing` module does, but they can be used as
building blocks for such a feature.

Running one-shot commands
-------------------------

To run an external command with one call, use :func:`~anyio.run_process`::

from anyio import run_process, run


async def main():
result = await run_process('/usr/bin/ps')
print(result.stdout.decode())

run(main)

The snippet above runs the ``ps`` command within a shell (. To run it directly::

from anyio import run_process, run


async def main():
result = await run_process(['/usr/bin/ps'])
print(result.stdout.decode())

run(main)

Working with processes
----------------------

When you have more complex requirements for your interaction with subprocesses, you can launch one
with :func:`~anyio.open_process`::

from anyio import open_process, run
from anyio.streams.text import TextReceiveStream


async def main():
async with await open_process(['/usr/bin/ps']) as process:
for text in TextReceiveStream(process.stdout):
print(text)

run(main)

See the API documentation of :class:`~anyio.abc.subprocesses.Process` for more information.
1 change: 1 addition & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
- Unified checkpoint behavior: a cancellation check now calls ``sleep(0)`` on asyncio and curio,
allowing the scheduler to switch to a different task
- Added memory object streams
- Added support for subprocesses
- Dropped support for Python 3.5
- Bumped minimum versions of trio and curio to v0.16 and v1.2, respectively
- Fixed a bug where a task group would abandon its subtasks if its own cancel scope was cancelled
Expand Down
78 changes: 77 additions & 1 deletion src/anyio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
from ipaddress import ip_address, IPv6Address
from pathlib import Path
from socket import AddressFamily, SocketKind
from subprocess import PIPE, CompletedProcess, DEVNULL, CalledProcessError
from typing import TypeVar, Callable, Union, Optional, Awaitable, Coroutine, Any, Dict, List, Tuple

import sniffio

from ._utils import convert_ipv6_sockaddr
from .abc import (
Lock, Condition, Event, Semaphore, CapacityLimiter, CancelScope, TaskGroup, IPAddressType,
SocketStream, UDPSocket, ConnectedUDPSocket, IPSockAddrType, Listener, SocketListener,
SocketStream, UDPSocket, ConnectedUDPSocket, IPSockAddrType, Listener, SocketListener, Process,
AsyncResource)
from .fileio import AsyncFile
from .streams.tls import TLSStream
Expand Down Expand Up @@ -280,6 +281,81 @@ def current_default_worker_thread_limiter() -> CapacityLimiter:
return _get_asynclib().current_default_thread_limiter()


#
# Subprocesses
#

async def run_process(command: Union[str, typing.Sequence[str]], *, input: Optional[bytes] = None,
stdout: int = PIPE, stderr: int = PIPE,
check: bool = True) -> CompletedProcess:
"""
Run an external command in a subprocess and wait until it completes.
.. seealso:: :func:`subprocess.run`
:param command: either a string to pass to the shell, or an iterable of strings containing the
executable name or path and its arguments
:param input: bytes passed to the standard input of the subprocess
:param stdout: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL`
:param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL` or
:data:`subprocess.STDOUT`
:param check: if ``True``, raise :exc:`~subprocess.CalledProcessError` if the process
terminates with a return code other than 0
:return: an object representing the completed process
:raises CalledProcessError: if ``check`` is ``True`` and the process exits with a nonzero
return code
"""
async def drain_stream(stream, index):
chunks = [chunk async for chunk in stream]
stream_contents[index] = b''.join(chunks)

async with await open_process(command, stdin=PIPE if input else DEVNULL, stdout=stdout,
stderr=stderr) as process:
stream_contents = [None, None]
try:
async with create_task_group() as tg:
if process.stdout:
await tg.spawn(drain_stream, process.stdout, 0)
if process.stderr:
await tg.spawn(drain_stream, process.stderr, 1)
if process.stdin and input:
await process.stdin.send(input)
await process.stdin.aclose()

await process.wait()
except BaseException:
process.kill()
raise

output, errors = stream_contents
if check and process.returncode != 0:
raise CalledProcessError(typing.cast(int, process.returncode), command, output, errors)

return CompletedProcess(command, typing.cast(int, process.returncode), output, errors)


async def open_process(command: Union[str, typing.Sequence[str]], *, stdin: int = PIPE,
stdout: int = PIPE, stderr: int = PIPE) -> Process:
"""
Start an external command in a subprocess.
.. seealso:: :class:`subprocess.Popen`
:param command: either a string to pass to the shell, or an iterable of strings containing the
executable name or path and its arguments
:param stdin: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL`
:param stdout: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL`
:param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL` or
:data:`subprocess.STDOUT`
:return: an asynchronous process object
"""
shell = isinstance(command, str)
return await _get_asynclib().open_process(command, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr)


#
# Async file I/O
#
Expand Down
98 changes: 98 additions & 0 deletions src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
List, Dict, Sequence, Type, Deque)
from weakref import WeakKeyDictionary

from dataclasses import dataclass

from .. import (
abc, claim_worker_thread, _local, T_Retval, TaskInfo, GetAddrInfoReturnType)
from ..abc import IPSockAddrType, SockAddrType
Expand Down Expand Up @@ -526,6 +528,102 @@ def run_async_from_thread(func: Callable[..., Coroutine[Any, Any, T_Retval]], *a
return f.result()


#
# Subprocesses
#

@dataclass
class StreamReaderWrapper(abc.ByteReceiveStream):
_stream: asyncio.StreamReader

async def receive(self, max_bytes: int = 65536) -> bytes:
data = await self._stream.read(max_bytes)
if data:
return data
else:
raise EndOfStream

async def aclose(self) -> None:
self._stream.feed_eof()


@dataclass
class StreamWriterWrapper(abc.ByteSendStream):
_stream: asyncio.StreamWriter

async def send(self, item: bytes) -> None:
self._stream.write(item)
await self._stream.drain()

async def aclose(self) -> None:
self._stream.close()


@dataclass
class Process(abc.Process):
_process: asyncio.subprocess.Process
_stdin: Optional[abc.ByteSendStream]
_stdout: Optional[abc.ByteReceiveStream]
_stderr: Optional[abc.ByteReceiveStream]

async def aclose(self) -> None:
if self._stdin:
await self._stdin.aclose()
if self._stdout:
await self._stdout.aclose()
if self._stderr:
await self._stderr.aclose()

await self.wait()

async def wait(self) -> int:
return await self._process.wait()

def terminate(self) -> None:
self._process.terminate()

def kill(self) -> None:
self._process.kill()

def send_signal(self, signal: int) -> None:
self._process.send_signal(signal)

@property
def pid(self) -> int:
return self._process.pid

@property
def returncode(self) -> Optional[int]:
return self._process.returncode

@property
def stdin(self) -> Optional[abc.ByteSendStream]:
return self._stdin

@property
def stdout(self) -> Optional[abc.ByteReceiveStream]:
return self._stdout

@property
def stderr(self) -> Optional[abc.ByteReceiveStream]:
return self._stderr


async def open_process(command, *, shell: bool, stdin: int, stdout: int, stderr: int):
await check_cancelled()
if shell:
process = await asyncio.create_subprocess_shell(command, stdin=stdin, stdout=stdout,
stderr=stderr)
else:
process = await asyncio.create_subprocess_exec(*command, stdin=stdin, stdout=stdout,
stderr=stderr)

stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None
stdout_stream = StreamReaderWrapper(process.stdout) if process.stdout else None
stderr_stream = StreamReaderWrapper(process.stderr) if process.stderr else None
return Process(process, stdin_stream, stdout_stream, stderr_stream)


#
# Sockets and networking
#
Expand Down
Loading

0 comments on commit 5847c61

Please sign in to comment.