Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature request] async as_completed function #7

Open
dhirschfeld opened this issue Jul 31, 2020 · 7 comments
Open

[feature request] async as_completed function #7

dhirschfeld opened this issue Jul 31, 2020 · 7 comments
Labels
enhancement New feature or request

Comments

@dhirschfeld
Copy link

dhirschfeld commented Jul 31, 2020

I think an async-iterable function which took functions to execute asynchronously and yielded the results as an Outcome objects would be very useful.

I find the daskas_completed api to be very convenient and am looking for something similar in trio.

I think it's a more fundamental primitive than either wait_any or wait_all as (I think) both could be implemented with an async def as_completed(*funcs) -> AsyncIterable function.

@belm0 belm0 added the enhancement New feature or request label Aug 1, 2020
@belm0
Copy link
Member

belm0 commented Aug 1, 2020

There was some discussion of this on a Trio issue, python-trio/trio#1089 (comment), and that thread noted the obstacle that a generator can't yield from an internal nursery. So you end up having to pass in an external nursery, or hop through a context manager.

@dhirschfeld
Copy link
Author

Thanks for the link @belm0 - an interesting discussion! I don't think having to pass in a nursery is too onerous. Perhaps an api like below could be made to work?

async with trio.open_nursery() as nursery:
    async for idx, outcome in as_completed(*async_fns, nursery=nursery):
        nursery.start_soon(process_result, outcome.unwrap())

@dhirschfeld
Copy link
Author

I thought I'd naively try to use trio_async_generator to implement this but failed at the first hurdle. Haven't had time to look into it but thought I'd post in case it's obvious what I'm doing wrong.

from functools import partial
from typing import Any, AsyncIterable, Awaitable, Callable, Tuple
from trio_util import trio_async_generator


@trio_async_generator
async def as_completed(
    *funcs: Callable[[], Awaitable[Any]]
) -> AsyncIterable[Tuple[int, Any]]:

    async def yield_result(func: Callable[[], Awaitable[Any]], *, idx: int) -> None:
        result = await func()
        yield idx, result

    async with trio.open_nursery() as nursery:
        for idx, func in enumerate(funcs):
            nursery.start_soon(
                partial(yield_result, func, idx=idx)
            )
async def f(x): 
    return x


async with as_completed(partial(f, 1), partial(f, 2)) as results:
    async for idx, result in results:
        print(idx, result)
Traceback (most recent call last):
  File "C:\Users\dhirschf\envs\dev\lib\site-packages\trio_util\_trio_async_generator.py", line 50, in adapter
    value = await agen.__anext__()
AttributeError: 'coroutine' object has no attribute '__anext__'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\dhirschf\envs\dev\lib\site-packages\IPython\core\interactiveshell.py", line 3435, in run_code
    last_expr = (await self._async_exec(code_obj, self.user_ns))
  File "<ipython-input-94-2025cd5b27a0>", line 6, in async-def-wrapper
  File "C:\Users\dhirschf\envs\dev\lib\contextlib.py", line 177, in __aexit__
    await self.gen.__anext__()
  File "C:\Users\dhirschf\envs\dev\lib\site-packages\trio_util\_trio_async_generator.py", line 72, in wrapper
    yield receive_channel
  File "C:\Users\dhirschf\envs\dev\lib\site-packages\trio\_core\_run.py", line 813, in __aexit__
    raise combined_error_from_nursery
  File "C:\Users\dhirschf\envs\dev\lib\site-packages\trio_util\_trio_async_generator.py", line 68, in adapter
    return
  File "C:\Users\dhirschf\envs\dev\lib\site-packages\async_generator\_util.py", line 14, in __aexit__
    await self._aiter.aclose()
AttributeError: 'coroutine' object has no attribute 'aclose'

I'm testing on py37 in case that makes a difference.

@belm0
Copy link
Member

belm0 commented Apr 13, 2022

I think the body of as_completed() itself needs a yield to be a generator. The yield in yield_result() applies to that nested function (making it an async generator), rather than applying to as_completed().

@dhirschfeld
Copy link
Author

Yeah, I figured that wouldn't work 🤦‍♂️

Will have to think how I can yield as results arrive 🤔

@belm0
Copy link
Member

belm0 commented Apr 13, 2022

I guess the function results feed into a trio memory channel, and the main loop just yields incoming items?

@dhirschfeld
Copy link
Author

I guess the function results feed into a trio memory channel, and the main loop just yields incoming items?

That sounds like it could work - will give it a go and report back!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants