Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Awaiting futures versus starting coroutines (How opinionated is trio?) #892

Closed
abetkin opened this issue Jan 29, 2019 · 20 comments
Closed

Comments

@abetkin
Copy link

abetkin commented Jan 29, 2019

Hi, @njsmith and trio folks!

As I see in trio docs, the main API is letting trio run my async functions for me.
Like this:

async def hobby():
    # long-running
    dream = await trio.sleep(a_lot)
    return dream

async def someday():
    result = await hobby()
    with trio.open_nursery() as ns:
        counter = 0
        for b in everybody:
            ns.start_soon(boast, b, result, counter)
    print('score', counter)

async def boast(someone, something, counter):
    '''
    if `someone` has envy for `something`, please increment the `counter`.
    '''

This is a valid approach, and probably a good style, since it passes the arguments explicitly
to each coroutine. There is another aproach though.

Please, say if I'm wrong, but a task in trio docs usually equals to coroutine, i.e., not yet started or even created. In asyncio, a task is a Future. This aproach tries to use them:

async def boast(somebody):
    result = await hobby_task
    if result in somebody.envy_list:
        counter += 1

hobby_task = asyncio.create_task(hobby())
tasks = [hobby_task] + [
    asyncio.create_task(boast(b))
    for b in everybody
]
counter = 0

async def main():
    await asyncio.gather(*tasks)
    print('score', counter)

The difference is that we are using global variables: hobby_task, tasks, and the counter. Probably, an antipattern. But imagine we are handling a web request, and it can serve as a natural scope, so globally-scoped tasks are actually scoped by a web request.

In many cases, it makes sense to execute async tasks once, and let other async tasks use the result.
In our example, that task is hobby(). Can't do await hobby() in every coroutine because it's a lengthy task.

Also, you can't cancel the hobby task from another coroutine: not without the additional knowledge, which other coroutines are using it.

My question is: can trio be used with this application design? Or does it have parts that can be reused for this?

Basically, tasks (started coroutines) can be viewed as functional components that form some kind of dependency tree.
Theoretically, the nodes in this tree can do supervision/nursery for the dependent nodes.
And the tree knows all the dependencies, so it knows which tasks can be cancelled safely.

Also, do you see any issues with this design? Maybe, you know frameworks that are more suited for it?

P.S.
This issue was born from me trying to adapt apistar by @tomchristie to the needs of a project at work.
(upstream version is different stuff, have to look at 0.5.x version for this). It's a dependency injection framework, and it has "components" classes backed by async tasks.

@abetkin
Copy link
Author

abetkin commented Jan 29, 2019

@dabeaz Probably could use your advice as well :)

@njsmith
Copy link
Member

njsmith commented Jan 31, 2019

Hello!

Trio does not provide a 1-1 equivalent of an asyncio.Task, i.e., there's no built-in function to spawn off some background task, track its result, and potentially cancel it. There are a number of reasons why we don't think this is the best tool to foreground as the standard way to do everything (some described here). And a lot of people have found that when switching to Trio from another system like asyncio, they need some time to get used to the new way of thinking, spend a lot of energy fighting Trio initially, and then eventually something clicks and they start finding the Trio way of doing things more natural than the asyncio way. But, if you have a problem where something like an asyncio.Task really is the best way to do it – and I don't understand your specific situation well enough to tell either way – then Trio certainly can handle it!

For example, you might do something like:

import trio
from functools import partial
# See https://outcome.readthedocs.io
import outcome
# This is 3.7+ only; if using older Python see https://stackoverflow.com/a/48800772/1925449
from contextlib import asynccontextmanager

class ManagedTaskCancelled(Exception):
    pass

class ManagedTask:
    def __init__(self, async_fn):
        self._async_fn = async_fn
        self._finished = trio.Event()
        self._cancel_scope = trio.CancelScope()  # unbound cancel scope; requires trio v0.11.0 (out soon)
        self.outcome = None

    async def _run(self):
        # XX TODO: this is actually broken in the presence of MultiErrors... it's very difficult to do this
        # correctly until MultiError v2 lands (see #611)!
        try:
            with self._cancel_scope:
                self.outcome = outcome.Value(await self._async_fn())
        except Exception as exc:
            self.outcome = outcome.Error(exc)
        finally:
            if self.outcome is None:
                # It raised some kind of BaseException... we'll treat them all as "cancelled"
                self.outcome = outcome.Error(ManagedTaskCancelled("cancelled"))
            self._finished.set()
        # Once #611 lands, the above will reduce to something like:
        # with self._cancel_scope:
        #     try:
        #         self.outcome = await outcome.acapture(self._async_fn, filter=Exception)
        #     finally:
        #         if self.outcome is None:
        #             self.outcome = outcome.Error(ManagedTaskCancelled("cancelled"))
        #         self._finished.set()

    def cancel(self):
        self._cancel_scope.cancel()

    async def wait(self):
        await self._finished.wait()


class ManagedTaskRunner:
    def __init__(self, nursery):
        self._nursery = nursery

    def start_soon(self, async_fn, *args):
        managed_task = ManagedTask(partial(async_fn, *args))
        self._nursery.start_soon(managed_task._run)
        return managed_task


@asynccontextmanager
async def open_managed_task_runner():
    async with trio.open_nursery() as nursery:
        try:
            yield ManagedTaskRunner(nursery)
        finally:
            # If you want any remaining "managed tasks" to be automatically cancelled when the
            # request finishes:
            nursery.cancel_scope.cancel()

################################

async def handle_request(...):
    async with open_managed_task_runner() as runner:
        hobby_task = runner.start_soon(hobby)
        # Now we can do hobby_task.cancel() to request cancellation,
        # await hobby_task.wait() to wait for it,
        # and then examine hobby_task.cancelled and hobby_task.outcome to find out what happened

This is fairly elaborate – in many cases I suspect there's a simpler way to accomplish what you want without all of this machinery. But it does give you all the same power as the asyncio version, with a number of bonuses – for example, if handle_request is cancelled, that will automatically propagate to all of the background tasks. If you hit control-C and that happens to be delivered while Python is running one of the background tasks, it will propagate out properly and shut everything down. And you can always shove it in a utility library and forget about the details...

@dabeaz
Copy link

dabeaz commented Jan 31, 2019

My only thought is that there are often many ways to do things. Should you try this in Curio, you'll find it much less opinionated about the whole thing. For one, you can create free-floating tasks if you want. You can also use Curio TaskGroup objects to get results in various ways and to do things that are similar to the nursery idea. However, you could also just dispense with all of that complexity and use the gather() function that Curio already provides to do what you describe.

@abetkin
Copy link
Author

abetkin commented Jan 31, 2019

@njsmith Thank you very much for your answer and the example code. It did provoke more thought.
Btw, it would be great if things like multi-error and outcome could be usable in any async code.

Indeed, trio has no concept of a task. It also forbids the reuse of coroutine objects, which one might treat as a (unstarted) future: you get RuntimeError if you try. So trio's API is more restricting in this way than the one of asyncio.

To me it is natural to be able to use/reuse the task object: why not, everything is an object in python.
Also, task is a logical notion. You can cancel it for example, or fetch it's result.

Btw, in trio a nursery has that start_soon method. It doesn't return a value. Suppose it returned a task that was started:

async with open_nursery() as n:
    a_task = n.start_soon(c1)
    val = await n.start_soon(c2)

Those 2 lines (with and without await) have different meaning, in trio we can't do that.

From what I understand, trio.Event is the closest equivalent of a task. I did't see any clear performance bottleneck in using it instead of tasks. From what I see, it's a queue of events being dispatched, but of course it's implemented in python, whereas in asyncio it is a basic primitive.

@abetkin
Copy link
Author

abetkin commented Jan 31, 2019

@dabeaz Tried it, with curio everything is possible, yay!

Btw, why isn't task awaitable? You can make await task <=> await task.join(). Also: curio.gather(tasks) vs. asyncio.gather(*tasks)

@njsmith
Copy link
Member

njsmith commented Jan 31, 2019

Btw, it would be great if things like multi-error and outcome could be usable in any async code.

Outcome is already a standalone utility library – you can use it in any python program, whether it's async or not :-).

The plan with "MultiError v2" is that Yury and I will be writing a PEP to hopefully make it built-in to the interpreter in 3.8. See #611 for all the details. (It has to be builtin to really work 100% reliably, plus Yury is eager to add it to asyncio. AFAICT his plan at this point is to turn asyncio into trio as much and as fast as he can within the limits imposed by backwards compatibility etc. I'm not sure if that makes asyncio more or less attractive to you :-).)

To me it is natural to be able to use/reuse the task object: why not, everything is an object in python.
Also, task is a logical notion. You can cancel it for example, or fetch it's result.

Yes, trio has a very different design than asyncio. Not sure what to tell you there. It's not true that everything in Python is an object; for example, a synchronous function call is not an object. Trio takes the position that synchronous python works pretty well so we might as well copy it whenever possible :-). Most trio users never learn about coroutine objects at all.

And unfortunately, AFAICT it's impossible to have reasonable cancellation semantics in any system where tasks have results and are cancelable. The two properties are just inherently at odds – if task objects carry results, then it means many different consumers may look at those results over time, possibly including consumers who don't know about each other. But if you want to cancel a Task/Future, you have to know that all its potential consumers aren't interested in that result any more, even if you don't know who those consumers are. Of course in your code you at least in principle know what all your different tasks could do in the future, and could somehow keep track of this and only call cancel when you know that no-one will ever want a certain task's result... But! asyncio itself also calls cancel on tasks sometimes, and it doesn't know what your code is doing, so sometimes it will do the wrong thing.

In particular, suppose tasks A and B both do await C, and then task A is cancelled. In this case, asyncio will automatically cancel task C, even though task B is still using it! In fact task B will also be cancelled, even though you only meant to cancel task A!

The solution is to make sure that you never ever call await on the same Task/Future more than once. If you read the implementation of features in asyncio itself, like asyncio.Event, they make the code more complicated than you'd expect in order to follow this rule. In our example, the solution might be to create a task D that runs the C code, and then inserts the result into two Future objects, and then tasks A and B would await these separate Future objects.

Btw, in trio a nursery has that start_soon method. It doesn't return a value. Suppose it returned a task that was started:

In early versions of trio, start_soon returned a value that let you check on the status of the task and be notified of results. In practice we found over and over that code got simpler when we didn't use this feature, plus it added a lot of complexity to the API, so we eventually ripped it out. We did try, though :-).

It sounds like you're very committed to the asyncio way of thinking about tasks and futures. Most people who switch from asyncio to Trio struggle for a while to wrap their head around the different approach, and find it very frustrating until suddenly it clicks and they can't imagine ever going back to asyncio. (This was my experience, in fact.) I also realize that this kind of makes Trio sound like a cult, so, uh... I totally understand if you're skeptical and prefer to stick to what makes sense for you currently. That's a very sensible position :-). If asyncio is what makes sense to you then you should definitely use asyncio; it's way better at being asyncio than anything else is.

@dabeaz
Copy link

dabeaz commented Jan 31, 2019

In particular, suppose tasks A and B both do await C, and then task A is cancelled. In this case, asyncio will automatically cancel task C, even though task B is still using it! In fact task B will also be cancelled, even though you only meant to cancel task A!

It's really kind of amazing how complex scenarios like this just "work" in Curio.

from curio import *

async def task_a(c):
    print("A got", await c.join())

async def task_b(c):
    print("B got", await c.join())

async def task_c(x, y):
    await sleep(10)
    return x + y

async def main():
    c = await spawn(task_c, 2, 3)
    a = await spawn(task_a, c)
    b = await spawn(task_b, c)
    await sleep(2)
    await a.cancel()
    print("A cancelled")
    await b.join()
    result = await c.join()
    print("Main done", result)

run(main)

Output:

A cancelled
B got 5
Main done 5

Anyways, I'll just leave it at that. Task away... if you must. Or not ;-).

@njsmith
Copy link
Member

njsmith commented Jan 31, 2019

@dabeaz Yeah, curio's approach of never automatically propagating cancels across task.join definitely produces fewer surprises than asyncio's approach of always propagating cancels. The trade-off of course is that if task A and task B are both cancelled, then task C will keep running in the background and wasting resources, even if no one cares about the result anymore.

@abetkin
Copy link
Author

abetkin commented Jan 31, 2019

In particular, suppose tasks A and B both do await C, and then task A is cancelled. In this case, asyncio will automatically cancel task C, even though task B is still using it! In fact task B will also be cancelled, even though you only meant to cancel task A!

I guess that asyncio assumes by default that tasks were created in trio-like safe way, so it
can cancel them. If you reuse tasks then await asyncio.shield(task) will solve the issue.

And unfortunately, AFAICT it's impossible to have reasonable cancellation semantics in any system where tasks have results and are cancelable

asyncio.gather has cancellation semantics akin to trio's nursery. I tried to write the simplest equivalent of the latter:

class TaskGroup:

    def __init__(self):
        self._tasks = []

    @property
    def task(self):
        return asyncio.gather(*self._tasks)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        pass

    def __truediv__(self, coro):
        return self.create_task(coro)

    def __await__(self):
        return self.task.__await__()

    def create_task(self, coro):
        t = asyncio.create_task(coro)
        self._tasks.append(t)
        return t

with TaskGroup() as g:
    task = g / co1()
    val = await g / co2()
    await g

Well, I defend asyncio, but actually like curio more. Will give it a try I guess

@njsmith
Copy link
Member

njsmith commented Feb 2, 2019

@abetkin Be careful – asyncio.gather does transmit cancellation to all of the tasks, but it has very broken behavior if one of those tasks fails spontaneously – by default it immediately returns, while abandoning the rest of the tasks to run unsupervised. AFAICT asyncio.wait is always what you should use instead of asyncio.gather. It also has a curio-ish return_when= argument.

...though actually now that I think about it, I'm nto actually sure how asyncio.wait responds to being cancelled.

...looking at the source, AFAICT canceling wait actually leaves all the children running. So if what you want is something nursery-ish, then gather and wait are broken in exactly complementary ways. Good luck, I guess :-)

Anyway, sounds like you've got a plan and there's nothing todo for trio here, so I'm going to mark this closed. Feel free to keep chatting.

@arthur-tacca
Copy link
Contributor

You can already see from the backlink above that I've requested a very similar concept in trio-util, but I thought it was worth adding to the discussion here about cancellation semantics given @njsmith's comments above:

And unfortunately, AFAICT it's impossible to have reasonable cancellation semantics in any system where tasks have results and are cancelable.

...

Yeah, curio's approach of never automatically propagating cancels across task.join definitely produces fewer surprises than asyncio's approach of always propagating cancels. The trade-off of course is that if task A and task B are both cancelled, then task C will keep running in the background ...

This is maybe a bit off topic for this thread, but I think the most common reason for wanting tasks in Trio (read: the reason I want them) is just to reduce the boilerplate needed for accessing the return value of a coroutine started in a nursery. You don't need complex cancellation semantics for that. Actually I've had situations where I definitely don't want cancellation to be propagated (starting a callback in a user supplied nursery and waiting for it to complete).

Once you drop the requirement for propagating cancellation like that, it becomes much easier to implement a task class, and also easier for users to understand IMO:

  • await t.run() starts the task; must be called exactly once.
    • If the wrapped coroutine completes without exception, always returns None (not the coroutine return value).
    • If the wrapped coroutine completes throws an exception, then that exception escapes totally unmodified regardless of what it is
  • await t.wait_complete() waits until the task finishes (returns or throws). Under the hood it can just wait on a trio.Event.wait().
    • Always returns None
    • Never throws anything (except injected exceptions like cancellation or keyboard interrupt)
  • t.result returns the task's result or throws its exception. But, because the exception is always wrapped, there's no need for any special casing e.g. for Cancelled or MultiError.
    • If the wrapped coroutine has completed without exception, returns the coroutine's result.
    • If the wrapped coroutine has thrown an exception e, throws WrappedException(e)
    • If the wrapped coroutine has not yet completed, throws TaskNotCompletedException

Although I've requested this in trio-util, I think it would even make a lot of sense in Trio itself.

@smurfix
Copy link
Contributor

smurfix commented Sep 11, 2022

Actually I've had situations where I definitely don't want cancellation to be propagated (starting a callback in a user supplied nursery and waiting for it to complete).

Mmh. I assume that you start the task in the user's nursery so that if the user's nursery errors out your task gets cancelled along with it. Correct?

If so, I'd just do this:

#!/usr/bin/python3

import trio
import outcome

async def some_task():
    await trio.sleep(0.5)
    return 42

class CaptureCancelled(Exception):
    pass

class Capture:
    def __init__(self, nursery, p,*a,**k):
        self._evt = trio.Event()
        nursery.start_soon(self._run,p,a,k)

    async def _run(self, p,a,k):
        self._result = await outcome.acapture(p,*a,**k)
        self._evt.set()

    def __await__(self):  # shortcut
        return self.get_result().__await__()

    async def get_result(self):
        await self._evt.wait()
        try:
            return self._result.unwrap()
        except trio.Cancelled as err:
            raise CaptureCancelled() from err

async def main():
    async with trio.open_nursery() as N:
        c = Capture(N,some_task)  # you'd use a different nursery of course
        ...
        res = await c  # or "await c.get_result()" without the shortcut
        print(res)

trio.run(main)

No task object required.

@TeamSpen210
Copy link
Contributor

I had a slightly simpler design, where you can't wait on the task class at all - instead the result is accessible only once the nursery has completed successfully. If it was cancelled or raised an exception, code using the result will never be reached.

@smurfix
Copy link
Contributor

smurfix commented Sep 12, 2022

Well if it's a "guest nursery" (i.e.. one supplied by the client) you can't depend on the guest terminating any time soon, and if it's cancelled your code won't notice as the cancellation won't be propagated to you.

I have related code in https://gist.github.com/smurfix/0130817fa5ba6d3bb4a0f00e4d93cf86 that streams results from subtasks into an iterator as they become available. That obviously won't work if you need to wait for the nursery to end.

I'd really like to have this kind of nursery extension available as a mix-in instead of bolting it on via external classes and whatnot, but the pesky Final metaclass prevents that.

@arthur-tacca
Copy link
Contributor

arthur-tacca commented Sep 12, 2022

@smurfix

Thanks for that example Capture class. That's actually what I meant by "task"! It's very similar to what I described in the linked issue.

The main difference is the exception handling. This was the main point I was making in my previous comment, I think the solution I described would work better (no chance of lost exceptions, no need to special case which exceptions get wrapped and which don't, easier for users to understand). Did you have any thoughts about that? To spell it out, it would look like this (not tested so probably has silly syntax errors): (I also deliberately decoupled from nursery to make a bit more flexible and explicit in user code.)

class Task:
    def __init__(self, routine, *args):
        self.routine = routine
        self.args = args
        self.is_started = False
        self.result = None
        self.exception = None
        self.completed_event = trio.Event()

    async def run(self):
        assert not self.is_started
        self.is_started = True
        try:
            self.result = await self.routine(*self.args)
        except BaseException as e:
            self.exception = e
            raise  # Note the exception is allowed to propagate into user nursery
        finally:
            self.completed_event.set()

    async def wait_complete(self):
        await self.completed_event.wait()

    def get_result(self):
        if not self.completed_event.is_set():
            raise TaskNotCompletedException(self)
        if self.exception is not None:
            raise TaskWrappedException(self) from self.exception  # Exception is always wrapped for task user
        return self.result

@arthur-tacca
Copy link
Contributor

I assume that you start the task in the user's nursery so that if the user's nursery errors out your task gets cancelled along with it.

That's not the main motivation. Instead, I'm assuming that the user's nursery survives longer than this routine. If this routine gets cancelled, the callback should be allowed to continue running as long as the user nursery does.

To be specific, in one use case the user nursery is "all open connections" while this routine is for creating connections and the callback is logic for handling the connection. During shutdown, we should stop initiating new connections immediately, but we should spend a reasonable grace period shutting down open connections gracefully. This is very much much the same situation as trio.serve_tcp(), which takes a handler and a handler nursery, except that routine doesn't care about when the handler completes.

@arthur-tacca
Copy link
Contributor

arthur-tacca commented Sep 14, 2022

Update

I have now put my idea for a task-like class into its own package: aioresult. It's now called ResultCapture, and it runs a function and stores the result. There's also a Future class for when you want to manually set the value, and a few functions for waiting (though normally just using a nursery would do).

Original post

Just for a giggle, I made a version of my toy Task class that is also compatible with the task_status protocol as used by Nursery.start(): https://gist.github.com/arthur-tacca/32c9b5fa81294850cabc890f4a898a4e

@indigoviolet
Copy link

indigoviolet commented Oct 3, 2022

Adding some more elaboration on this topic from a gitter conversation:


My question:
What do trio experts think of https://pypi.org/project/trio-future/? It looks to me like a nice implementation of a commonly requested pattern (eg #892) on top of trio, but it hasn't received much attention, so I'm wondering if I'm missing something.

Matthias Urlichs smurfix 01:33
@indigoviolet The missing piece is "what happens if the code that requested the future result gets cancelled". Shrug? Cancel the future-producing code? Do something else to tell it that it's no longer wanted (e.g. it talks on a multiplexed server and needs to send an ABORT TRANSACTION message)? Personally I think that if we only offer the first, people are going to implement ugly workarounds when they need one of the others. Also IMHO, implementing this as a wrapper of a single-use memory queue under the hood is overkill, easier to do this with a class that just holds an Event and an Outcome.

Matthias Urlichs smurfix 01:40
Also, IME most of the time you actually don't want a gather that waits for everything; instead you just want to iterate the results (and maybe start more tasks depending on them, e.g. when crawling data structures or web pages). Some time ago I've sketched a nursery wrapper that uses an iterator for that, as an easier-to-use alternative to the clone-the-send-channel dance you'd otherwise need to perform. It's at https://gist.github.com/smurfix/0130817fa5ba6d3bb4a0f00e4d93cf86

arthur-tacca arthur-tacca 05:29
As it happens, I've been thinking about this quite a lot recently, and even wrote my own future-like class:

https://gist.github.com/arthur-tacca/32c9b5fa81294850cabc890f4a898a4e

I think a lot of the time you do just want something like gather(). That function waits until all tasks are complete and stops early if one throws an exception. Guess what: that's a Trio nursery! (Except nurseries deal with exceptions and cancellation much better of course.) The fact that Trio nurseries exist prove that waiting for a bunch of tasks to all finish is useful. It's just missing a way to get the task return values (without intrusively changing the functions to save results elsewhere).

The key thing that I've realised is that, at least in this simple situation, the best way for exceptions to work is that they're just allowed to propagate straight out. All the task/future classes that I've seen try to be too clever: they try to perfectly forward exceptions by catching and masking them in the task's context, and then re-raising them when fetching the result (typically by using the Outcome library).

The clever solution actually makes things worse IMO, because it prevents the nursery where the task is being run from being cancelled (even thought that's probably what you want), and it means you need to be careful not to get the result more than once (which is annoying). Worst of all, you must get it once otherwise the exception is silently lost - that's the cardinal sin that nurseries where original meant to prevent. Plus, the exception may not make sense in the new context; the classic example is a cancel exception that is now being raised outside it's owning nursery but I think it's a sign of a general problem.

Sorry for the wall of text. In summary, my issues with trio-future are: (1) it shouldn't perfectly forward exceptions, (2) it shouldn't have a gather() function (just advice to use nurseries for that role).

Lura lura:veriny.tf [m] 05:32
My strong opinion on the nursery return value argument is that regular nurseries shouldn't care about capturing return values, and that there should be an async map function (or parallel_map if youre a streams kinda guy) ala collection |> parallel_map(some_fn, MapMode.{IN_ORDER|AS_RETURNED})
Essentially the only time I've ever cared about results from nurseries is with a map kinda function as every other time they're used for background processing that pumps a channel anyway

Alex Grönholm agronholm 05:35
I've run into plenty of situations where I needed a future-like construct and I've been contemplating adding such a thing to AnyIO
it would essentially be a simple combination of an event and a value (or exception), so like a reduced asyncio.Future

Venky Iyer indigoviolet 19:32
This is super helpful. I will add this info to one of the relevant issues on the trio github so it isn't lost. I certainly endorse adding a tool like this to trio/anyio, (the need seems well recognized on trio issues, and trimeter, for example is a solution that is being considered AFAICT). The straightforward nursery wrapper or result-capture or trio-future version should be available more easily than a gist or a 1-star repo.
@:smurfix , re: "Also IMHO, implementing this as a wrapper of a single-use memory queue under the hood is overkill" -- is this an objection about code complexity or performance?

arthur-tacca arthur-tacca 21:15
Trio-future's use of memory channels: partly it's efficiently, and partly it's just odd, like if you came across code that used a list to store a single value. The way it abuses clones of memory channels in gather() to emulate a semaphore is even odder. I think memory channels were just the only synchronisation primitive the author was aware of (when all you have is a hammer...)

@bob1de
Copy link

bob1de commented Mar 3, 2023

Hi,

I stumbled across this issue while looking for a kind of breathing nursery that yields task outcomes as they become available, basically something along the lines of the nursery wrapper of @smurfix. Inspired by that, I thought of an API to safely handle scenarios where new work may be spawned in reaction to the processing of previous results. What came out are basically three concepts, all somewhat akin and related to the discussion here, but otherwise not strictly dependent on one another.

I would be very interested in the opinions of @njsmith and the other experts on which of these (and to what extent) could be valuable to have in Trio, and in discussion of the API and UX of course.
Before I start opening individual issues, I thought posting it here for initial assessment would be appropriate.

  • A Future, which basically bundles CancelScope + Event + Outcome, allowing a task to be cancelled, awaited and capturing its outcome, without manual with/try/except/finally nesting and the juggling of three objects. However, in contrast to other approaches I've seen so far, it captures only specific types of exceptions (incl. ExceptionGroup splitting), so faulty code doesn't go unnoticed and whichever nursery runs the future will get any uncaught exceptions. Whether future is the correct name for this has to be seen, but at least to me the idea and API have proven to be useful, and it has clean semantics w.r.t. cancellation and exception handling.

  • An object named WorkSet, which especially targets producer/consumer workloads as described above and allows for both synchronous and asynchronous fetching of results. It has an acknowledgement mechanism, so that even in case of multiple concurrent consumers, the underlying nursery can be kept open until all results were processed and there are no new tasks coming. Additionally, results can be handed back to the queue or deferred when processing is currently not possible, e.g. when a consumer got cancelled or due to temporary unavailability of some other resource. It also supports a CapacityLimiter to limit the number of concurrently running tasks.

  • Lastly, there's a gather function, which just wraps the nursery/collect outcome dance, but additionally features the exact same selective exception catching as Future.

The whole thing can be found here: https://gist.github.com/bob1de/80aaefc4d5515e70ed25cd19b861af94

Don't be overwhelmed by the sheer number of lines, without having counted, ~2/3 of it are docs and comments, so help(Future) and help(WorkSet) should give a complete (and hopefully useful) introduction.
There are some commented TODOs on points IMHO requiring special elaboration.
Demo code is included as well, just run the file to see all of it in action.

Thanks for your time!

EDIT: Restructured text and added more details.

@bob1de
Copy link

bob1de commented Mar 5, 2023

After having rethought this a bit, probably the way to go is to open an issue for the future part at least, as that's the most fundamental and -- considering past discussions -- most requested interface. In the latest version, I factored that out into an ABC to facilitate different implementations than the async callable one. Will reference this issue over there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants