Skip to content

Commit

Permalink
Renamed some functions and methods to match trio
Browse files Browse the repository at this point in the history
Fixes #105.
  • Loading branch information
agronholm committed Apr 14, 2020
1 parent cba4e20 commit a33a471
Show file tree
Hide file tree
Showing 15 changed files with 136 additions and 76 deletions.
8 changes: 4 additions & 4 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ Task groups
Threads
-------

.. autofunction:: anyio.run_in_thread
.. autofunction:: anyio.run_sync_in_worker_thread
.. autofunction:: anyio.run_async_from_thread
.. autofunction:: anyio.current_default_thread_limiter
.. autofunction:: anyio.current_default_worker_thread_limiter

Async file I/O
--------------

.. autofunction:: anyio.aopen
.. autofunction:: anyio.open_file

.. autoclass:: anyio.abc.AsyncFile

Expand Down Expand Up @@ -103,7 +103,7 @@ Synchronization
Operating system signals
------------------------

.. autofunction:: anyio.receive_signals
.. autofunction:: anyio.open_signal_receiver

Testing and debugging
---------------------
Expand Down
8 changes: 4 additions & 4 deletions docs/fileio.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ operations in worker threads.

Example::

from anyio import aopen, run
from anyio import open_file, run


async def main():
async with await aopen('/some/path/somewhere') as f:
async with await open_file('/some/path/somewhere') as f:
contents = await f.read()
print(contents)

Expand All @@ -19,11 +19,11 @@ Example::
The wrappers also support asynchronous iteration of the file line by line, just as the standard
file objects support synchronous iteration::

from anyio import aopen, run
from anyio import open_file, run


async def main():
async with await aopen('/some/path/somewhere') as f:
async with await open_file('/some/path/somewhere') as f:
async for line in f:
print(line, end='')

Expand Down
4 changes: 2 additions & 2 deletions docs/signals.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ AnyIO provides a simple mechanism for you to receive the signals you're interest

import signal

from anyio import receive_signals, run
from anyio import open_signal_receiver, run


async def main():
async with receive_signals(signal.SIGTERM, signal.SIGHUP) as signals:
async with open_signal_receiver(signal.SIGTERM, signal.SIGHUP) as signals:
async for signum in signals:
if signum == signal.SIGTERM:
return
Expand Down
12 changes: 6 additions & 6 deletions docs/threads.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ To run a (synchronous) callable in a worker thread::

import time

from anyio import run_in_thread, run
from anyio import run_sync_in_worker_thread, run


async def main():
await run_in_thread(time.sleep, 5)
await run_sync_in_worker_thread(time.sleep, 5)

run(main)

Expand All @@ -34,17 +34,17 @@ Calling asynchronous code from a worker thread

If you need to call a coroutine function from a worker thread, you can do this::

from anyio import run_async_from_thread, sleep, run_in_thread, run
from anyio import run_async_from_thread, sleep, run_sync_in_worker_thread, run


def blocking_function():
run_async_from_thread(sleep, 5)


async def main():
await run_in_thread(blocking_function)
await run_sync_in_worker_thread(blocking_function)

run(main)

.. note:: The worker thread must have been spawned using :func:`~anyio.run_in_thread` for this to
work.
.. note:: The worker thread must have been spawned using :func:`~anyio.run_sync_in_worker_thread`
for this to work.
9 changes: 9 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.

- **BACKWARDS INCOMPATIBLE** Removed the ``anyio.finalize()`` context manager since as of curio
1.0, it is no longer necessary. Use ``async_generator.aclosing()`` instead.
- **BACKWARDS INCOMPATIBLE** Renamed some functions and methods to match their corresponding names
in Trio:

- ``Stream.close()`` -> ``Stream.aclose()``
- ``AsyncFile.close()`` -> ``AsyncFile.aclose()``
- ``anyio.aopen()`` -> ``anyio.open_file()``
- ``anyio.receive_signals()`` -> ``anyio.open_signal_receiver()``
- ``anyio.run_in_thread()`` -> ``anyio.run_sync_in_worker_thread()``
- ``anyio.current_default_thread_limiter()`` -> ``anyio.current_default_worker_thread_limiter()``

**1.3.0**

Expand Down
25 changes: 13 additions & 12 deletions src/anyio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ def create_task_group() -> TaskGroup:
# Threads
#

def run_in_thread(func: Callable[..., T_Retval], *args, cancellable: bool = False,
limiter: Optional[CapacityLimiter] = None) -> Awaitable[T_Retval]:
def run_sync_in_worker_thread(func: Callable[..., T_Retval], *args, cancellable: bool = False,
limiter: Optional[CapacityLimiter] = None) -> Awaitable[T_Retval]:
"""
Start a thread that calls the given function with the given arguments.
Expand Down Expand Up @@ -246,7 +246,7 @@ def run_async_from_thread(func: Callable[..., Coroutine[Any, Any, T_Retval]], *a
return asynclib.run_async_from_thread(func, *args)


def current_default_thread_limiter() -> CapacityLimiter:
def current_default_worker_thread_limiter() -> CapacityLimiter:
"""
Return the capacity limiter that is used by default to limit the number of concurrent threads.
Expand Down Expand Up @@ -335,10 +335,10 @@ def open_process(command: Union[str, Sequence[str]], *, stdin: int = PIPE,
# Async file I/O
#

def aopen(file: Union[str, 'os.PathLike', int], mode: str = 'r', buffering: int = -1,
encoding: Optional[str] = None, errors: Optional[str] = None,
newline: Optional[str] = None, closefd: bool = True,
opener: Optional[Callable] = None) -> Coroutine[Any, Any, AsyncFile]:
def open_file(file: Union[str, 'os.PathLike', int], mode: str = 'r', buffering: int = -1,
encoding: Optional[str] = None, errors: Optional[str] = None,
newline: Optional[str] = None, closefd: bool = True,
opener: Optional[Callable] = None) -> Coroutine[Any, Any, AsyncFile]:
"""
Open a file asynchronously.
Expand All @@ -351,7 +351,8 @@ def aopen(file: Union[str, 'os.PathLike', int], mode: str = 'r', buffering: int
if sys.version_info < (3, 6) and hasattr(file, '__fspath__'):
file = str(file)

return _get_asynclib().aopen(file, mode, buffering, encoding, errors, newline, closefd, opener)
return _get_asynclib().open_file(file, mode, buffering, encoding, errors, newline, closefd,
opener)


#
Expand Down Expand Up @@ -417,8 +418,8 @@ async def try_connect(af: int, sa: tuple, event: Event):
if bind_host:
interface, family, _v6only = await get_bind_address(bind_host)

target_addrs = await run_in_thread(socket.getaddrinfo, address, port, family,
socket.SOCK_STREAM, cancellable=True)
target_addrs = await run_sync_in_worker_thread(socket.getaddrinfo, address, port, family,
socket.SOCK_STREAM, cancellable=True)
oserrors: List[OSError] = []
async with create_task_group() as tg:
for i, (af, *rest, sa) in enumerate(target_addrs):
Expand Down Expand Up @@ -546,7 +547,7 @@ async def create_udp_socket(
interface, family = None, 0

if target_host:
res = await run_in_thread(socket.getaddrinfo, target_host, target_port, family)
res = await run_sync_in_worker_thread(socket.getaddrinfo, target_host, target_port, family)
if res:
family, type_, proto, _cn, sa = res[0]
target_host, target_port = sa[:2]
Expand Down Expand Up @@ -663,7 +664,7 @@ def create_memory_stream(cls=None):
# Operating system signals
#

def receive_signals(*signals: int) -> UnreliableReceiveMessageStream[int]:
def open_signal_receiver(*signals: int) -> UnreliableReceiveMessageStream[int]:
"""
Start receiving operating system signals.
Expand Down
15 changes: 7 additions & 8 deletions src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import attr

from .. import claim_worker_thread, _local, T_Retval, TaskInfo, T_Item
from ..abc.files import AsyncFile as AbstractAsyncFile
from ..abc.networking import (
TCPSocketStream as AbstractTCPSocketStream, UNIXSocketStream as AbstractUNIXSocketStream,
UDPPacket, UDPSocket as AbstractUDPSocket, ConnectedUDPSocket as AbstractConnectedUDPSocket,
Expand Down Expand Up @@ -561,18 +562,16 @@ async def open_process(command, *, shell: bool, stdin: int, stdout: int, stderr:
# Async file I/O
#

class AsyncFile:
class AsyncFile(AbstractAsyncFile):
def __init__(self, fp) -> None:
self._fp = fp

def __getattr__(self, name):
return getattr(self._fp, name)

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
@property
def wrapped(self):
return self._fp

async def __aiter__(self):
while True:
Expand Down Expand Up @@ -618,11 +617,11 @@ async def tell(self) -> int:
async def flush(self) -> None:
return await run_in_thread(self._fp.flush)

async def close(self) -> None:
async def aclose(self) -> None:
return await run_in_thread(self._fp.close)


async def aopen(*args, **kwargs):
async def open_file(*args, **kwargs):
fp = await run_in_thread(partial(open, *args, **kwargs))
return AsyncFile(fp)

Expand Down
67 changes: 64 additions & 3 deletions src/anyio/_backends/_curio.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import math
import os
import socket
import sys
from collections import OrderedDict, defaultdict
Expand All @@ -8,7 +9,7 @@
from threading import Thread
from typing import (
Callable, Set, Optional, Coroutine, Any, cast, Dict, List, Sequence, ClassVar,
Generic, Type, DefaultDict)
Generic, Type, DefaultDict, Union)
from weakref import WeakKeyDictionary

import attr
Expand All @@ -20,6 +21,7 @@
import curio.traps

from .. import T_Retval, claim_worker_thread, TaskInfo, _local, T_Item
from ..abc.files import AsyncFile as AbstractAsyncFile
from ..abc.networking import (
TCPSocketStream as AbstractTCPSocketStream, UNIXSocketStream as AbstractUNIXSocketStream,
UDPPacket, UDPSocket as AbstractUDPSocket, ConnectedUDPSocket as AbstractConnectedUDPSocket,
Expand Down Expand Up @@ -495,9 +497,68 @@ async def open_process(command, *, shell: bool, stdin: int, stdout: int, stderr:
# Async file I/O
#

async def aopen(*args, **kwargs):
class AsyncFile(AbstractAsyncFile):
def __init__(self, fp) -> None:
self._fp = fp

def __getattr__(self, name):
return getattr(self._fp, name)

@property
def wrapped(self):
return self._fp

async def __aiter__(self):
while True:
line = await self.readline()
if line:
yield line
else:
break

async def read(self, size: int = -1) -> Union[bytes, str]:
return await run_in_thread(self._fp.read, size)

async def read1(self, size: int = -1) -> Union[bytes, str]:
return await run_in_thread(self._fp.read1, size)

async def readline(self) -> bytes:
return await run_in_thread(self._fp.readline)

async def readlines(self) -> bytes:
return await run_in_thread(self._fp.readlines)

async def readinto(self, b: Union[bytes, memoryview]) -> bytes:
return await run_in_thread(self._fp.readinto, b)

async def readinto1(self, b: Union[bytes, memoryview]) -> bytes:
return await run_in_thread(self._fp.readinto1, b)

async def write(self, b: bytes) -> None:
return await run_in_thread(self._fp.write, b)

async def writelines(self, lines: bytes) -> None:
return await run_in_thread(self._fp.writelines, lines)

async def truncate(self, size: Optional[int] = None) -> int:
return await run_in_thread(self._fp.truncate, size)

async def seek(self, offset: int, whence: Optional[int] = os.SEEK_SET) -> int:
return await run_in_thread(self._fp.seek, offset, whence)

async def tell(self) -> int:
return await run_in_thread(self._fp.tell)

async def flush(self) -> None:
return await run_in_thread(self._fp.flush)

async def aclose(self) -> None:
return await run_in_thread(self._fp.close)


async def open_file(*args, **kwargs):
fp = await run_in_thread(partial(open, *args, **kwargs))
return curio.file.AsyncFile(fp)
return AsyncFile(fp)


#
Expand Down
5 changes: 1 addition & 4 deletions src/anyio/_backends/_trio.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,7 @@ async def open_process(command, *, shell: bool, stdin: int, stdout: int, stderr:
# Async file I/O
#

async def aopen(*args, **kwargs):
f = await trio.open_file(*args, **kwargs)
f.close = f.aclose
return f
open_file = trio.open_file


#
Expand Down
4 changes: 2 additions & 2 deletions src/anyio/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ async def get_bind_address(interface: Optional[IPAddressType]) -> Tuple[str, int
try:
if_addr = ip_address(interface)
except ValueError:
from . import run_in_thread
from . import run_sync_in_worker_thread

warnings.warn('Passing a host name as the interface address has been deprecated. '
'Use an IP address instead.', category=DeprecationWarning)
res = await run_in_thread(socket.getaddrinfo, interface, 0)
res = await run_sync_in_worker_thread(socket.getaddrinfo, interface, 0)
return res[0][-1][0], res[0][0], False

family = socket.AF_INET6 if isinstance(if_addr, IPv6Address) else socket.AF_INET
Expand Down
15 changes: 4 additions & 11 deletions src/anyio/abc/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,15 @@ class AsyncFile(AsyncResource):
This class also supports asynchronous iteration::
async with await aopen(...) as f:
async with await anyio.open_file(...) as f:
async for line in f:
print(line)
"""

@property
@abstractmethod
async def __aenter__(self):
pass

@abstractmethod
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
def wrapped(self):
"""The wrapped synchronous file object."""

@abstractmethod
def __aiter__(self):
Expand Down Expand Up @@ -97,7 +94,3 @@ async def tell(self) -> int:
@abstractmethod
async def flush(self) -> None:
pass

@abstractmethod
async def close(self) -> None:
pass
Loading

0 comments on commit a33a471

Please sign in to comment.