Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fine-tuning channels #719

Open
njsmith opened this issue Oct 5, 2018 · 33 comments
Open

Fine-tuning channels #719

njsmith opened this issue Oct 5, 2018 · 33 comments

Comments

@njsmith
Copy link
Member

njsmith commented Oct 5, 2018

Channels (#586, #497) are a pretty complicated design problem, and fairly central to user experience, so while I think our first cut is pretty decent, we'll likely want to fine tune it as we get experience using them.

Here are my initial notes (basically questions that I decided to defer while implementing version 1):

In open_memory_channel, should we make max_buffer_size=0 default? Is this really a good choice across a broad range of settings? The docs recommend it right now, but that's not really based on lots of practical experience or anything. And it's way easier to add defaults later than it is to remove or change them.

For channels that allow buffering, it's theoretically possible for values to get "lost" without there ever being an exception (send puts it in the buffer, then receiver never takes it out). This even happens if both sides are being careful, and closing their endpoint objects when they're done with them. We could potentially make it so that ReceiveChannel.aclose raises an exception (I guess BrokenResourceError) if the channel wasn't already cleanly closed by the sender. This would be inconsistent with the precedent set by ReceiveStream (which inherited it from BSD sockets), but OTOH, ReceiveStream is a much lower-level API – to use a ReceiveStream you need some non-trivial engineering work to implement a protocol on top, while ReceiveChannel is supposed to be a simple high-level protocol that's usable out-of-the-box. Practically speaking, though, it would be pretty annoying if some code inside a consumer crashes, and then ReceiveChannel.__aexit__ throws away the actual exception and replaces it with BrokenChannelError.

Should memory channels have a sync close() method? They could easily, they only reason not to is that it makes life simpler for hypothetical IPC channels. I have no idea yet how common those will be, or whether it's actually important for in-process channels and cross-process channels to have compatible APIs.

I have mixed feelings about open_memory_stream returning a bare tuple (send_channel, receive_channel). It's simple, and allows for unpacking. But, it requires everyone to memorize which order they go in, which is annoying, and it forces you to juggle two objects, as compared to the traditional Queue design that only has one object. I'm convinced that having two objects is important for closure-tracking and IPC, but there is an option that's a kind of hybrid: it could return a ChannelPair object, that's a simple container for a .send_channel and .receive_channel object, and, if we want, the ChannelPair could have send and receive methods that simply delegate to its constituent objects. Then people who hate having two objects could treat ChannelPair like a Queue, while still breaking it into pieces if desired, or doing closure tracking like async with channel_pair.send_channel: .... But... I played with this a bit, and found it annoying to type out channel_pair.send_channel all the time. In particular, if you do want to split the objects up, then you lose the option of deconstructing-assignment, so you have to do cumbersome attribute access instead. (Or we could also support deconstructing-assignemtn by implementing __iter__, but that's yet another confusing way then...) For now my intuition is that closure tracking is so broadly useful that everyone will want to use it, and that's easiest with destructuring assignment. But experience might easily prove that intuition wrong.

Is clone confusing? Is there anything we can do to make it less confusing? Maybe a better name? So far people have flagged this multiple times, but we didn't have docs yet, so it could just be that...

And btw, the Channel vs Stream distinction is much less obvious than I'd like. No-one would look at those and immediately guess that ah, a Channel carries objects while a Stream carries bytes. It is learnable, and has some weak justification, and I haven't though of anything better yet, but if someone thinks of something brilliant please speak up.

Should the *_nowait methods be in the ABC, or just in the memory implementation? They don't make a lot of sense for things like inter-process channels, or like, say... websockets. (A websocket can implement the SendChannel and ReceiveChannel interfaces, just with the proviso that objects being sent have to be type str or bytes.) The risk of harm is not large, you can always implement these as unconditional raise WouldBlock, but maybe it will turn out to be more ergonomic to move them out of the ABC.

We should probably have a trio.testing.check_channel, like we do for streams.

njsmith added a commit to njsmith/trio that referenced this issue Oct 5, 2018
@njsmith
Copy link
Member Author

njsmith commented Oct 5, 2018

Would it be helpful to have a way to explicitly mark a channel as broken, e.g. send_channel.poison()? (This name is taken from the CSP tradition – here's the earliest reference I can find in a quick look. Their version of "poison" also encompasses our send_channel.aclose, though.) Anyway, the point is, if a producer crashes, maybe you want to tell the other side that this wasn't a clean shutdown, so it can do... something.

While I'm here, also check out this mailing list post from 2002 with subject "Concurrency, Exceptions, and Poison", by the same author. It's basically about how exceptions and nurseries should interact.

@smurfix
Copy link
Contributor

smurfix commented Oct 5, 2018

Yes, poison as an exception-on-the-other-side-raising alternate to closing would be useful.
(Maybe just add an appropriate argument to .close?)

@ziirish
Copy link
Contributor

ziirish commented Nov 3, 2018

Hello,

Would it be possible to document how we should replace Queue with memory_channel?
Maybe my usage of Queues is wrong, but I don't understand yet how I should use the memory_channel instead.

I'm currently using a Queue as a pool of objects. These objects are basically wrappers around subprocess.Popen so the idea is to have a pre-determined number of processes to parallelize operations up to queue-size.
I know there are CapacityLimiter which achieve something similar, but they only work for actual python code. Here I want to limit external processes operations.

The pseudo code looks like:

class MonitorPool:
    pool_size = 5
    pool = trio.Queue(pool_size)

    async def handle(self, server_stream: trio.StapledStream):
        # do something
        ident = next(CONNECTION_COUNTER)
        data = await server_stream.receive_some(8)
        async with self.get_mon(ident) as mon:
            response = mon.status(data)
        # do other things

    async def launch_monitor(self, id):
        mon = Monitor(ident=id)
        await self.pool.put(mon)

    async def cleanup_monitor(self):
        while not self.pool.empty():
            mon = await self.pool.get()  # noqa
            del mon

    @asynccontextmanager
    async def get_mon(self, ident) -> Monitor:
        mon = await self.pool.get()  # type: Monitor
        yield mon
        await self.pool.put(mon)

    async def run(self):
        async with trio.open_nursery() as nursery:
            for i in range(self.pool_size):
                nursery.start_soon(self.launch_monitor, i + 1)
        try:
            await trio.serve_tcp(self.handle, self.port, host=self.bind)
        except KeyboardInterrupt:
            pass

        async with trio.open_nursery() as nursery:
            nursery.start_soon(self.cleanup_monitor)

@njsmith
Copy link
Member Author

njsmith commented Nov 5, 2018

@ziirish Oo, that's clever! If I were implementing a pool I probably would have reached for a Semaphore plus a set, but your way is probably simpler... I wonder if we should have a first-class pool object, since this pattern shows up in different places? I don't think I understand the variations well enough to design such a thing right now though, so that's just an idea for later.

Generally, to replace a Queue with a memory stream, you replace queue.put with send_channel.send, and queue.get with receive_channel.receive. It does make this use case a little more cumbersome, because now you have two objects to keep track of instead of just one, but once you do that then most things transfer over directly. So something like:

class MonitorPool:
    def __init__(self, pool_size=5):
        self.pool_size = pool_size
        self.send_channel, self.receive_channel = trio.open_memory_stream(pool_size)

    async def launch_monitor(self, id):
        mon = Monitor(ident=id)
        await self.send_channel.send(mon)

    @asynccontextmanager
    async def get_mon(self, ident) -> Monitor:
        mon = await self.receive_channel.receive()  # type: Monitor
        yield mon
        await self.pool.send_channel.send(mon)

The one thing that doesn't transfer over directly is cleanup_monitor, since channels don't have a .empty() method. But, I'm not quite sure how to translate this code, because I'm not sure what you're trying to do :-). Remember that Python is a garbage collected language; the way you free objects is by simply stopping referring to them. So when your MonitorPool object goes out of scope, then any Monitor objects inside it will be automatically freed. Your cleanup_monitor method does speed this up a bit by emptying out the queue, but the del mon line is effectively a no-op: all it does is undefine the local variable mon, but that will happen anyway as soon as the function exits. So if you do want to empty out the pool immediately, you don't really need to iterate over the individual objects; you just need to clear the internal buffer. And for memory channels, the easy way to do this is just to close the receive channel; this tells Trio that you're never going to call receive again, so it immediately discards all the data in the internal buffer.

Also, this code:

        async with trio.open_nursery() as nursery:
            nursery.start_soon(self.cleanup_monitor)

is basically a more complicated way of writing:

    await self.cleanup_monitor()

And it's generally nice to run cleanup code even if there's an exception... with blocks are good for this. In particular, you can use async with receive_stream to automatically call receive_stream.aclose() when exiting a block. So I'd probably drop the cleanup_monitor method and rewrite run as:

    async def run(self):
        async with self.receive_stream:
            async with trio.open_nursery() as nursery:
                for i in range(self.pool_size):
                    nursery.start_soon(self.launch_monitor, i + 1)
            try:
                await trio.serve_tcp(self.handle, self.port, host=self.bind)
            except KeyboardInterrupt:
                pass

BTW, the type annotation for server_stream should probably be trio.abc.Stream?

@njsmith
Copy link
Member Author

njsmith commented Nov 5, 2018

Speaking of type annotations, I wonder if we should make the channel ABCs into parametrized types, like:

T_cov = TypeVar("T_cov", covariant=True)
T_contra = TypeVar("T_contra", contravariant=True)

class ReceiveChannel(Generic[T_cov]):
    async def receive(self) -> T_cov:
        ...

class SendChannel(Generic[T_contra]):
    async def send(self, obj: T_contra) -> None:
        ...

def open_memory_channel(max_buffer_size) -> Tuple[SendChannel[Any], ReceiveChannel[Any]]:
    ...

(For the variance stuff, see: https://mypy.readthedocs.io/en/latest/generics.html#variance-of-generic-types. I always get confused by this, so I might have it wrong...)

It might even be nice to be able to request a type-restricted memory channel. E.g. @ziirish might want to do something like:

s, r = open_memory_channel[Monitor](pool_size)
# or maybe:
s, r = open_memory_channel(pool_size, restrict_type=Monitor)

Ideally this would both be enforced at runtime (the channels would check for isinstance(obj, Monitor) and raise an error if it didn't match), and statically (mypy infers that the return type is Tuple[SendStream[Monitor], ReceiveStream[Monitor]]). ...I think?

Complications:

Runtime and static types are kind of different things. They overlap a lot, but I don't actually know if there's any way to take an arbitrary static type and check it at runtime? And PEP 563 probably affects this somehow too... It's possible we might want both a way to pass in a runtime type (and have the static type system automatically pick this up when possible), and also a purely static way to tell the static type system what type we want it to enforce for these streams without any runtime overhead?

Even if we ignore the runtime-checking part, I don't actually know whether there's any standard syntax for a function like this in python's static type system. Maybe it would need a mypy plugin regardless? (Of course, we're already talking about maintaining a mypy plugin for Trio, see python/mypy#5650, so this may not be a big deal.)

Of course for a pure static check you can write something ugly like:

s, r = cast(Tuple[SendStream[Monitor], ReceiveStream[Monitor]], open_memory_stream(pool_size))

but that's awful.

@njsmith
Copy link
Member Author

njsmith commented Nov 5, 2018

Actually, it is possible to do something like restrict_type=<Python class object> and have mypy understand it. It's a bit awkward:

@overload
def open_memory_channel() -> Tuple[SendChannel[Any], ReceiveChannel[Any]]:
    pass

@overload
def open_memory_channel(*, restrict_type: Type[T]) -> Tuple[SendChannel[T], ReceiveChannel[T]]:
    pass

def open_memory_channel(*, restrict_type=object):
    ...

reveal_type(open_memory_channel())  # Tuple[SendChannel[Any], ReceiveChannel[Any]]
reveal_type(open_memory_channel(restrict_type=int))  # Tuple[SendChannel[int], ReceiveChannel[int]]

You would think you could write:

def open_memory_channel(*, restrict_type: Type[T]=object):
    ...

but if you try then mypy gets confused because it checks the default value against the type annotation before doing generic inferencing (see python/mypy#4236).

One annoying issue with this is that it doesn't support restrict_type=(A, B), which is the standard way to do runtime union type checks.

Also, you can't express types like List[str] or Tuple[int, int, str], or do zero-runtime-overhead type checks. It actually is possible to make both of these work:

open_memory_channel(...)
open_memory_channel[int](...)

BUT AFAICT you can't make them work at the same time.

The way you make open_memory_channel[int](...) work is a gross hack. What it does is exploit the fact that there is a special fn[type](...) syntax specifically for calling the constructors of generic classes. So we can do:

class open_memory_channel(Tuple[SendChannel[T], ReceiveChannel[T]]):
    def __new__(self, max_buffer_size):
        return (SendChannel[T](), ReceiveChannel[T]())

    # Never called, but must be provided, with the same signature as __new__
    def __init__(self, max_buffer_size):
        assert False

# This is basically a Tuple[SendChannel[int], ReceiveChannel[int]]
reveal_type(open_memory_channel[int](0))
# This is Tuple[SendChannel[<nothing>], ReceiveChannel[<nothing>]], and you have to start throwing
# around manual type annotations to get anything sensible
reveal_type(open_memory_channel(0))

Did I mention it was gross? It's pretty gross. [Edit: I filed an issue on mypy to ask if there's any less gross way to do this: https://github.com/python/mypy/issues/6073]

Now... in this version, a bare open_memory_channel(0) does work fine at runtime, it's just that if you're using mypy, that bare open_memory_channel(0) isn't properly inferred to be a channel that can send any type of object, and you're forced to write open_memory_channel[Any](0) if that's what you want. I guess that might not be too terrible, for folks who are taking the trouble to use mypy?

@ziirish
Copy link
Contributor

ziirish commented Nov 5, 2018

Thanks @njsmith for the explanations.
I'll rework my Pool then and I could send you a PR to implement such a primitive based on memory_channel.

But the question then will be about this:

For channels that allow buffering, it's theoretically possible for values to get "lost" without there ever being an exception (send puts it in the buffer, then receiver never takes it out).

The purpose of the pool is to be always filled with objects and then to block/pause the execution of the job when it is empty. So at the end of your job, the pool should be "full" resulting in "lost" data.
I guess the Pool primitive should just ignore the BrokenResource exception then.

@belm0
Copy link
Member

belm0 commented Nov 7, 2018

I see that type annotations are used in a few spots already. @njsmith would you accept a PR to add type annotation to open_memory_channel() return value?

./trio/_subprocess/linux_waitpid.py:async def _task(state: WaitpidState) -> None:
./trio/_subprocess/linux_waitpid.py:async def waitpid(pid: int) -> Any:
./trio/_subprocess/unix_pipes.py:    def __init__(self, pipefd: int) -> None:
./trio/_subprocess/unix_pipes.py:    def fileno(self) -> int:
./trio/_subprocess/unix_pipes.py:    async def wait_send_all_might_not_block(self) -> None:
./trio/_subprocess/unix_pipes.py:    async def receive_some(self, max_bytes: int) -> bytes:
./trio/_subprocess/unix_pipes.py:async def make_pipe() -> Tuple[PipeSendStream, PipeReceiveStream]:
./trio/_util.py:def fspath(path) -> t.Union[str, bytes]:

@njsmith
Copy link
Member Author

njsmith commented Nov 7, 2018

@belm0 What annotation are you planning to use? :-)

@Zac-HD
Copy link
Member

Zac-HD commented Dec 9, 2018

I have mixed feelings about open_memory_stream returning a bare tuple (send_channel, receive_channel).

This is a perfect use-case for namedtuple('ChannelPair', 'send_channel receive_channel') - it supports both destructuring assignment (because it's a tuple) and named attribute access (because it's named). I usually find named tuples a poor substitute for proper data (/attrs) classes, but this is great 😄

@belm0
Copy link
Member

belm0 commented Dec 9, 2018

+1, but suggest unqualified names ("send", "receive")

@njsmith
Copy link
Member Author

njsmith commented Jan 11, 2019

And btw, the Channel vs Stream distinction is much less obvious than I'd like. No-one would look at those and immediately guess that ah, a Channel carries objects while a Stream carries bytes. It is learnable, and has some weak justification, and I haven't though of anything better yet, but if someone thinks of something brilliant please speak up.

Not super eager to rename things again, but I'm still mixing them up on a regular basis :-(.

The least-obviously-terrible idea I've had so far: Stream, which carries a continuous flow, vs. Belt, like a conveyor belt carrying discrete objects.

In other words:

versus

@smurfix
Copy link
Contributor

smurfix commented Jan 11, 2019 via email

@njsmith
Copy link
Member Author

njsmith commented Jan 11, 2019

It's true that if we have SendQueue and ReceiveQueue that doesn't even clash with the old name (though it still clashes conceptually with the stdlib and asyncio).

@njsmith
Copy link
Member Author

njsmith commented Jan 27, 2019

There's an implementation of poison in #850 (diff). I rejected it for now (rationale), but it may be useful to refer to in discussions.

@oremanj
Copy link
Member

oremanj commented Feb 7, 2019

Some thoughts from more experience working with not-in-memory channels:

  • send_nowait and receive_nowait seem cumbersome to have as required methods -- do we expect them to ever be useful for non-memory channels? Maybe all that's needed is to provide default implementations in the ABCs that raise WouldBlock?
  • clone is quite annoying to implement due to the need to factor channel state out into a separate object, which for non-memory channels usually wouldn't otherwise be required (memory channels need shared state anyway between the send and receive sides). I feel like maybe it shouldn't be in the base channel ABCs? Lots of use cases for channels don't need it, and it ought to be possible to totally generically implement FooChannel-plus-clone() in terms of a FooChannel-without-clone() implementation.
  • Maybe we want to have a bidirectional trio.abc.Channel which has send, receive, and aclose methods? (I haven't found any use for send_eoc().) It doesn't really make sense to clone these, which is another argument for dropping cloning from the SendChannel and ReceiveChannel ABCs: then we can say class Channel(SendChannel, ReceiveChannel): pass.

Poison: I've been working locally with a BreakableResource[E] ABC (extends AsyncResource) which defines async def aclose(*, error: Optional[E] = None). If you implement BreakableResource, you're specifying that you're capable of sending errors of type E to a remote peer, and aclose(error=...) behaves like aclose() except that it makes a best-effort attempt to communicate the error. This seems to work better than making poisoning a channel-specific thing; whether a communications resource can communicate errors seems orthogonal to whether it's being used as a channel. For example, an HTTP/2 stream is probably HalfCloseableStream plus BreakableResource[h2.errors.ErrorCodes].

clone() being tedious to implement makes me think more generally: Both streams and channels have a bunch of documented requirements on the implementation (new and currently-in-progress operations raise ClosedResourceError when it's closed, various parameter checks, aclose() and send_eof() are idempotent, send_all() and receive_some() do conflict detection, ...) which provide a low-friction experience for users of the interface but require a decent amount of boilerplate to be added by implementors of the interface. One could imagine a converse situation where the requirements are expressed as preconditions rather than guarantees: "you must not call aclose() twice, you must not make simultaneous calls to send_all() in different tasks, you must not call receive() on a closed channel", etc -- which would be low-friction for implementors and annoying for users. If we have to pick, of course it's better to make things low-friction for users, but I wonder if there's a way we can get the best of both worlds? Like, someone writes a channel or stream following the easy-to-implement version of the contract, and then we provide some generic wrapper/decorator/metaclass/something that turns it into one that follows the easy-to-use version of the contract. This could be a good solution for clone() as well.

@njsmith
Copy link
Member Author

njsmith commented Feb 8, 2019

@smurfix

We could just bite the bullet and rename the thing back to "queue". :-P

On further thought... I think I don't like this, because I think we're moving in the direction of having SendChannel/ReceiveChannel/Channel (or SendX/ReceiveX/X), and it'd be super confusing to use Queue to refer to something bidirectional. :-/

Speaking of bidirectional channels... here's a slightly wacky idea. What if open_memory_channel returned a pair of bidirectional channels? That would eliminate the confusion about whether it's (s, r) or (r, s) (!!!), and would let us skip adding a separate open_two_way_memory_channel constructor. A lot of use cases would only use the channel unidirectionally but... if you ignore the direction you're not using, then it doesn't hurt you, right? (And if someone wants to be strict about enforcing unidirectionality they can use mypy, and upcast the channels, like s: SendChannel, r: ReceiveChannel = open_memory_channel(). It's even self-documenting!)

@oremanj

send_nowait and receive_nowait seem cumbersome to have as required methods -- do we expect them to ever be useful for non-memory channels? Maybe all that's needed is to provide default implementations in the ABCs that raise WouldBlock?

The _nowait variants are pretty specialized, yeah... The two big reasons I can think of to use them are: (1) with _nowait you can fill or drain the buffer without invoking a checkpoint, (2) if you have an infinite buffer, send_nowait is guaranteed to succeed and can be called from synchronous context, which is useful in some cases. (These are the same cases that @smurfix pointed out when I was wondering whether we should support _nowait at all here.) But checkpoints are invisible outside the process, so (1) only applies to memory channels. And for (2), well... I guess you could attach an infinite buffer to a cross-process-channel, but it seems unlikely to be very useful, and even if you did then you can always call send from an async context...

I guess in many cases it won't be too hard to support some kind of _nowait variants on cross-process channels, because if you're designing your cross-process channels to be cancellation-safe, then you're probably going to implement them as some background tasks that handle the I/O, plus some memory channels to talk to the background tasks. So _nowait on your channel can simply be _nowait on the internal memory channel it uses to talk to the background task. But... it still doesn't seem like it'd be useful very often.

One possibility would be to move the _nowait variants to be part of the memory channel classes, rather than part of the abstract channel interface.

clone is quite annoying to implement due to the need to factor channel state out into a separate object

Hmm, I see what you mean. It would be pretty trivial if you allowed def clone(self): return self, but if you want to track the closure state separately for each object then that forces you to add substantial boilerplate.

I can certainly imagine cases though where you have a single cross-process channel, and you want to share it among multiple tasks. Like it's easy to imagine a program with multiple tasks all sending stuff occasionally on the same websocket. So it seems like clone might be useful here too?

Though... then again, maybe not! In this case, you almost certainly need a background task to manage the underlying transport. And that background task needs to live in a nursery. And probably all your tasks that use it live inside that nursery. So probably, when those tasks exit, your channel will automatically be shut down too, because the nursery exits. Basically this is doing the same thing as clone does.

I guess the advantage of clone is specifically when you want to put some senders and receivers into the same nursery. Which doesn't make much sense for cross-process channels, since if the senders and receivers are in different processes they probably aren't in the same nursery :-).

...this also makes me wonder if we should get rid of clone altogether, on the grounds that it's duplicative of what nurseries do? I'm not sure: clone is pretty elegant, and can lead to shorter code than creating extra nurseries, but it's also hard to explain, and emphasizing nurseries more is pretty "trionic".

Let's check the multi-producer/multi-consumer example in the docs. The version with clone looks like (copy-pasted from here):

import trio
import random

async def main():
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(0)
        async with send_channel, receive_channel:
            # Start two producers, giving each its own private clone
            nursery.start_soon(producer, "A", send_channel.clone())
            nursery.start_soon(producer, "B", send_channel.clone())
            # And two consumers, giving each its own private clone
            nursery.start_soon(consumer, "X", receive_channel.clone())
            nursery.start_soon(consumer, "Y", receive_channel.clone())

async def producer(name, send_channel):
    async with send_channel:
        for i in range(3):
            await send_channel.send("{} from producer {}".format(i, name))
            # Random sleeps help trigger the problem more reliably
            await trio.sleep(random.random())

async def consumer(name, receive_channel):
    async with receive_channel:
        async for value in receive_channel:
            print("consumer {} got value {!r}".format(name, value))
            # Random sleeps help trigger the problem more reliably
            await trio.sleep(random.random())

trio.run(main)

And then, here's the version where instead of clone, we create separate nurseries for the producers and consumers:

import trio
import random

async def main():
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(0)
        nursery.start_soon(producers, send_channel)
        nursery.start_soon(consumers, receive_channel)

async def producers(send_channel):
    async with send_channel:
        async with trio.open_nursery() as nursery:
            nursery.start_soon(producer, "A", send_channel)
            nursery.start_soon(producer, "B", send_channel)

async def producer(name, send_channel):
    for i in range(3):
        await send_channel.send("{} from producer {}".format(i, name))
        # Random sleeps help trigger the problem more reliably
        await trio.sleep(random.random())

async def consumers(receive_channel):
    async with receive_channel:
        async with trio.open_nursery() as nursery:
            nursery.start_soon(consumer, "X", receive_channel)
            nursery.start_soon(consumer, "Y", receive_channel)

async def consumer(name, receive_channel):
    async with receive_channel:
        async for value in receive_channel:
            print("consumer {} got value {!r}".format(name, value))
            # Random sleeps help trigger the problem more reliably
            await trio.sleep(random.random())

trio.run(main)

(They both do work the same way, for once I actually tested before posting a comment.)

I dunno. The second version is longer and a bit more tiresome to type. I'm not 100% convinced clone should be removed. But, I'm definitely giving it a skeptical look.

Poison: I've been working locally with a BreakableResource[E] ABC (extends AsyncResource) which defines async def aclose(*, error: Optional[E] = None).

Interesting! I don't think I have anything more useful to say than that right now and there's probably enough in this post already :-). But interesting!

@oremanj
Copy link
Member

oremanj commented Feb 8, 2019

What if open_memory_channel returned a pair of bidirectional channels?

Intriguing! If we expect most applications to still be unidirectional, we run into the problem that different users might pick different conventions for which return value is send and which is receive, which feels like it's not great for mutual intelligibility of the ecosystem? Also, the main selling point of Trio channels over asyncio/threading/etc queues is the closure tracking, which is pretty easy to reason about for unidirectional channels but (IMO) much subtler for bidirectional. As you pointed out on the other thread about send_eof(), half-close is not the most intuitive thing in the world.

_nowait variants

I definitely see the value of these for memory channels, and I see your point that they could have a reasonable implementation for cross-process channels that use a background task. But -- if a channel is implemented using a background task, I'm pretty sure its user-visible component is just going to be a memory channel. The background task holds onto one end of a memory channel and shovels objects between it and the transport, and the user sends or receives using the other end of the memory channel. There's no need for a different type of Channel subclass at all.

I feel like the main reason to write a new Channel subclass is if you don't want to use background tasks for some reason. Like, in an HTTP/2 implementation it's probably true that you don't want two background tasks per stream, which calls for a "push" rather than a "pull" sort of interface on the sending side, which you can't really do using memory channels.

Maybe there are only ever two families of channels -- the existing memory channels, plus one that supports the opposite-polarity interface using an ABC of some kind. (In memory channels, calling send() makes receive() on the other side return; in the opposite-polarity interface, calling send() on the channel would call send() on some interface that was passed when the channel was created.) Or people who need the opposite polarity could just write their own full Channel implementations, but then we're back to clone() being annoying.

[should we get rid of clone()?]

IMO, clone() is basically reference counting for an async resource. Which is useful! The concept isn't really specific to channels, but channels are the type of async resource that's friendliest to concurrent utilization, so I don't think there's necessarily anything wrong with only supporting it for channels. The main downside of reference counting here is that it becomes unclear when the resource actually goes away. Clarity about scoping is kind of Trio's thing, but with memory channels being such a foundational synchronization primitive, this might be a case where practicality beats purity?

@njsmith
Copy link
Member Author

njsmith commented Feb 10, 2019

Intriguing! If we expect most applications to still be unidirectional, we run into the problem that different users might pick different conventions for which return value is send and which is receive, which feels like it's not great for mutual intelligibility of the ecosystem?

I dunno, I feel like if I saw one library do:

producer, consumer = open_memory_channel(...)

and another one do

consumer, producer = open_memory_channel(...)

then... well, I'd think the second library author was a bit odd, I guess, because no-one ever talks about "consumer/producer architectures" :-). But I wouldn't be confused or anything.

Also, the main selling point of Trio channels over asyncio/threading/etc queues is the closure tracking, which is pretty easy to reason about for unidirectional channels but (IMO) much subtler for bidirectional. As you pointed out on the other thread about send_eof(), half-close is not the most intuitive thing in the world.

If producers only call send, and consumers only call receive, then I think the closure tracking ends up working exactly the same way? You don't need half-close.

But -- if a channel is implemented using a background task, I'm pretty sure its user-visible component is just going to be a memory channel.

Eh, not necessarily. A websocket is conceptually a Channel[Union[bytes,str]], but trio-websocket WebSocket objects also have other attributes, like ping() and pong() and some way to query the peer's address.

opposite polarity

Sorry, I didn't follow this part :-)

IMO, clone() is basically reference counting for an async resource. Which is useful!

It is useful, but I'm not sure whether it's so useful that it belongs on every channel. Especially since as you note, it's a hassle to implement :-). You can certainly implement it generically if you want:

class shared_ptr(AsyncResource, Generic[T]):  # name stolen from c++
    def __init__(self, resource: T):
        self.resource: Optional[T] = resource
        self._counter = [1]

    def clone(self) -> async_shared_ptr[T]:
        if self.resource is None:
            raise trio.ClosedResourceError
        else:
            new = async_shared_ptr(self.resource)
            new._counter = self._counter
            self._counter[0] += 1
            return new

    async def aclose(self) -> None:
        if self.resource is None:
            return
        self._counter[0] -= 1
        if not self._counter[0]:
            resource = self.resource
            self.resource = None
            await resource.aclose()

async def producer_consumer_example():
    p, c = open_memory_channel(...)
    async with trio.open_nursery() as nursery:
        async with shared_ptr(p) as p_shared, shared_ptr(c) as c_shared:
            nursery.start_soon(producer, p_shared.clone())
            nursery.start_soon(producer, p_shared.clone())
            nursery.start_soon(consumer, c_shared.clone())
            nursery.start_soon(consumer, c_shared.clone())

Either way though this seems way harder to explain than the version with separate nurseries for the producers and consumers.

@oremanj
Copy link
Member

oremanj commented Mar 9, 2019

If producers only call send, and consumers only call receive, then I think the closure tracking ends up working exactly the same way? You don't need half-close.

If you have a bidirectional memory channel type, it's pretty reasonable to want to send and receive on it using the same object. This is how Go's channels work, for example. If we don't expose methods to close just the send side or just the receive side, then we'll always have the same number of senders as receivers, so it won't ever be possible to get BrokenResourceError from a send or EndOfChannel from a receive -- because someone could come along later and receive/send on the same object you're sending/receiving on.

Here's a concrete proposal to elicit opinions:

  • Define a public type MemoryChannel which is a bidirectional Channel object. Deprecate open_memory_channel() in favor of this. Benefits:
    • Solves the what-order-does-it-go-in-again? problem.
    • Solves the gotta-come-up-with-twice-as-many-names-as-you-did-with-Queue problem.
    • Familiar to users who have experience with asyncio.Queue, Go channels, etc.
  • To maintain the closure tracking capabilities, give MemoryChannel the methods new_sender and new_receiver which return a SendChannel or ReceiveChannel, respectively, that shares the same internal buffer. (Could alternatively call this "clone" instead of "new", and/or use "producer/consumer" rather than "sender/receiver".)
  • For people who want to keep using different names for different sides, provide a convenience function:
    def memory_channel_pair(max_buffer_size):
        with MemoryChannel(max_buffer_size) as channel:
            return (channel.new_sender(), channel.new_receiver())
    
    that performs the same action as open_memory_channel but is not deprecated.
  • For people who actually want bidirectional communication where you can't accidentally receive the value you just sent -- which I think is substantially rarer than the previous use cases -- maybe add StapledChannel and
    def memory_channel_twoway_pair(max_buffer_size):
        left_send, right_receive = memory_channel_pair(max_buffer_size)
        right_send, left_receive = memory_channel_pair(max_buffer_size)
        return StapledChannel(left_send, left_receive), StapledChannel(right_send, right_receive)
    

@njsmith
Copy link
Member Author

njsmith commented Mar 11, 2019

I think we had a miscommunication :-). When I say "bidirectional" I mean "full duplex" – handle A can send/receive to handle B, and handle B can send/receive to handle A, but with a separate buffers for each direction. Think socketpair().

I'm not suggesting having a "loopback" channel where foo.send(...) transmits data to foo.receive(...). That would indeed mess up closure tracking (and that's the main reason we switched away from the Queue interface in the first place!). I also think it would be confusing to have some objects whose send and receive refer to the same underlying transmission stream, and others where they refer to independent transmission streams.

@goodboy
Copy link
Member

goodboy commented Mar 12, 2019

As a follow up to the discussion around cloned channels versus separate nurseries, I had the use case (from a test) where the consumer task is the lone task inside the nursery block.

Before adding an await send_chan.aclose() I was running into deadlock which I found quite surprising. It wasn't clear that clones are no different the the parent channel reference and additionally that a parent can (and must) be closed immediately if unused in that task/scope even if closure tracking is used in other tasks in order to avoid blocking the other end.

It seems to me that .clone() is pretty nice for avoiding extra nurseries since in this case I'd be spawning a task just to spawn tasks in a loop and I wonder what gets more confusing for a reader, more functions with nurseries or keeping track of all the channel refs?

I was wondering might it be possible to only implicitly create a clone if async with chan: is used and it's from a different task; instead of managing clones manually there's one created for every new task that uses the async with?

It seems to me the main purpose of clones is for making channels sort of task safe in terms of closure tracking. So maybe clones can just be something implicit and only activated if at least one task actually indicates that it expects closure tracking?

.aclose() seems to be the only thing called in a channel's .__aexit__() (according to AsyncResource) and each channel's aclose() implementation seems to be aware of all clones anyway via ._state.receive_tasks? I guess what I was thinking in terms of implementation is what if there is no notion of a clone but instead whenever __aenter__() is called on the channel you activate closure tracking for all tasks which have called __aenter__()?

I probably missing something but couldn't it be as simple as:

async def __aenter__(self):
    self._state.open_receive_channels += 1
    return self

And then removing the increment from __attrs_post_init__?

As far as I'm grok-ing the first closing example in the docs doesn't really need 2 tasks spawned since you can do what I did above (one task is the nursery body):

import trio

async def main():
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(0)
        nursery.start_soon(producer, send_channel)
        async with receive_channel:
            async for value in receive_channel:
                print("got value {!r}".format(value))

async def producer(send_channel):
    async with send_channel:
        for i in range(3):
            await send_channel.send("message {}".format(i))

trio.run(main)

To make this more like my test you need more then one producer task (see below).

So what I was thinking is, would it be crazy to suggest not expecting send_channel to be aclose()-ed in the main task since we already can count all tasks that have cloned it from the number of async with send_channel: statements in use (which should be at most one per task)? If none are used then closure tracking simply isn't activated, but if at least one is used then we only count each respective per-task-use in the reference counting for an async resource set? In other words is it possible for this to just work?

async def main():
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(0)
        for _ in range(2):
            nursery.start_soon(producer, send_channel)
        async with receive_channel:
            async for value in receive_channel:
                print("got value {!r}".format(value))

async def producer(send_channel):
    async with send_channel:
        for i in range(3):
            await send_channel.send("message {}".format(i))

trio.run(main)

If that's the case then can't we just forget about .clone() and just make channels task-safe for closure tracking once activated by at least one task using async with: on the channel?

@oremanj
Copy link
Member

oremanj commented Mar 12, 2019

I think we had a miscommunication :-). When I say "bidirectional" I mean "full duplex" – handle A can send/receive to handle B, and handle B can send/receive to handle A, but with a separate buffers for each direction. Think socketpair().

Aha, ok. I agree that that would work fine with closure tracking, but I worry that it introduces complexity that most people won't use and that adds additional chances to hide errors (if you call send/receive on (a clone of?) the wrong object, you get a hang instead of a TypeError; if you're using static type checking, the type checker can't tell you you got mixed up). It also makes it much harder for us to later support open_memory_channel() returning a single object instead of two. (Coming up with two names, and needing either a third name for the pair or a destructuring assignment that adds line length pressure, has been a huge source of friction for me in practice when using channels for simple things that don't care about closure tracking.)

I also think it would be confusing to have some objects whose send and receive refer to the same underlying transmission stream, and others where they refer to independent transmission streams.

We could call it a LoopbackChannel to make it explicit? I guess that's kind of an unusual name for something intended as a major primitive of inter-task communication, though. What if the "loopback channel" object (what I called MemoryChannel above) had methods called put and get instead of methods called send and receive? We could even call it Queue...

If we added such a "loopback channel" or "queue" that let you fork off new send and/or receive channels that share ownership of its internal buffer, and that could be closed without affecting those forked-off channels, then I would be a lot happier with making open_memory_channel return a connected pair of bidirectional channels. And, as you point out, it would be backwards-compatible. I know "there should be only one obvious way to do it" but I think making simple cases simple has overriding benefits for such a fundamental primitive.

@oremanj
Copy link
Member

oremanj commented Mar 12, 2019

@goodboy - interesting ideas! It sounds like you're not proposing anything that introspects "what task am I running from?" per se, but rather suggesting a different way of tracking whether the channel is still in use: instead of using clones, close the channel when the last async with channel: block exits. I can see the advantages in such an approach, but also have some concerns:

  • Do we add these semantics to every AsyncResource, or just to channels? Neither option seems great to me. If AsyncResource needs to be reference-counted, it's either no longer a pure ABC or adds implementation burden to lots of users; if only channels work this way, it becomes a possible source of confusion for people who've internalized "async with on an AsyncResource works just like calling aclose() when exiting the context".
  • With clones, we can detect whether you're using this clone after you've closed it, and raise ClosedResourceError even if the underlying transport is still in use by other clones. With your proposal that's not possible -- maybe the channel is closed after the async with exits, maybe it's not. This isn't just a bug-detection capability, because closing one clone also causes anyone blocked on it to be woken up with ClosedResourceError, which is a useful synchronization technique.
  • If you write
    async def some_task(channel):
        async with channel:
            ...
    async with trio.open_nursery() as nursery:
        async with my_channel:
            nursery.start_soon(some_task, my_channel)
            nursery.start_soon(some_other_task, my_channel)
    
    it looks pretty OK, but in fact the channel gets closed prematurely, because the child tasks don't start (and enter their own async with blocks) until the async with my_channel: block in the parent has been exited. If we try to solve this by allowing async with blocks to "reopen" a channel that has previously been closed, I think it becomes very hard to reason about correctness in some ways.

@oremanj
Copy link
Member

oremanj commented Mar 12, 2019

Other thoughts on the questions in the initial post:

In open_memory_channel, should we make max_buffer_size=0 default? Is this really a good choice across a broad range of settings? The docs recommend it right now, but that's not really based on lots of practical experience or anything. And it's way easier to add defaults later than it is to remove or change them.

I think we should. It's not the correct default for all applications, but it's the correct default for more applications than any other single number we could choose, and it makes it immediately obvious that sending on a channel can block. If we have a small but nonzero default buffer size, people who use it might write code that misbehaves when send() blocks (or blocks for too long), which will seem to work fine when testing using small amounts of data and then fall apart under load when the backpressure starts to build up. Also, using zero as a default means the default case doesn't have the possibility of "lost" values. Speaking of which:

For channels that allow buffering, it's theoretically possible for values to get "lost" without there ever being an exception (send puts it in the buffer, then receiver never takes it out). This even happens if both sides are being careful, and closing their endpoint objects when they're done with them. We could potentially make it so that ReceiveChannel.aclose raises an exception (I guess BrokenResourceError) if the channel wasn't already cleanly closed by the sender. This would be inconsistent with the precedent set by ReceiveStream (which inherited it from BSD sockets), but OTOH, ReceiveStream is a much lower-level API – to use a ReceiveStream you need some non-trivial engineering work to implement a protocol on top, while ReceiveChannel is supposed to be a simple high-level protocol that's usable out-of-the-box. Practically speaking, though, it would be pretty annoying if some code inside a consumer crashes, and then ReceiveChannel.aexit throws away the actual exception and replaces it with BrokenChannelError.

Exceptions thrown from __aexit__ chain with any exception that propagated out of the context, don't they? So the consumer exception isn't lost, it's just further up in the traceback.

I don't think we should necessarily enforce senders-must-close-before-receivers in all cases. Maybe something like:

  • If the last receiver closes with objects left in the buffer, but there are still senders open, raise BrokenResourceError at the next sender action -- whether that's a send or a close. This mirrors what happens when close() on UNIX encounters an I/O error flushing buffers, and makes it more likely that a prematurely crashing receiver won't have its traceback muddled.
  • If the last receiver closes with objects left in the buffer, and all the senders are closed, then raise BrokenResourceError for the receiver.

Should memory channels have a sync close() method? They could easily, they only reason not to is that it makes life simpler for hypothetical IPC channels. I have no idea yet how common those will be, or whether it's actually important for in-process channels and cross-process channels to have compatible APIs.

I've run into a few cases where a sync close() would be useful. I think it should be supported for memory channels.

@smurfix
Copy link
Contributor

smurfix commented Mar 12, 2019

Explicit is better than implicit: If you want to use a channel in more than one task independently, clone it.

Yes, "async with:" should close its resource. Files also get closed when their context ends, so this is not going to surprise anybody. No, you should not be able to re-open it (you can't do that with files either); if you want to use the channel in another context manager, clone() it. Or use a single encapsulating context. Or just, you know, use it without a context and call aclose manually.

The only problem I see with channels, in fact, is that you can call clone on a channel and then not call aclose on the thing, which means you'll get a deadlock. We probably should add a __del__ method that emits an error in that case (and sets the channel to being unuseable so that the error doesn't get swallowed).

@njsmith
Copy link
Member Author

njsmith commented May 25, 2019

Another possible name for channels: Mailbox (stolen from Erlang).

It has the nice property that it obviously refers to an endpoint, not the connection itself. Referring to "channel endpoints" all the time is a bit tiresome. One issue is that Erlang purists might complain because mailboxes have features that our channels don't, in particular they're unbounded and support a "selective receive" operation (you don't say "give me the next message", you say "give me the next message that matches the following predicate: ..."). Another issue is that it feels a bit weird to send and receive from the same mailbox. (In Erlang mailboxes are receive-only.)

We probably should add a __del__ method that emits an error in that case (and sets the channel to being unuseable so that the error doesn't get swallowed).

Yeah, __del__ is tricky. I don't think we should just close from __del__, because that's not possible for all channels (e.g. if you have a channel that's tunneled over TLS, then __del__ can't do a clean shutdown because that requires async operations). And, for channels where it is possible, like memory channels, it would make it easy to accidentally write code that seems to work but really is dependent on fragile details of the GC.

OTOH, if someone does have a bug where a channel endpoint gets lost, then ideally they should get some better feedback than just a deadlock.

I guess the two cases where __del__ is particularly suspicious are:

  • The only reason to use clone is if you're planning to explicitly close each of the clones one-by-one. So yeah, I agree, if a clone gets GC'ed without being closed, that's super suspicious. I'm not quite 100% sure about the case where someone calls clone and then let's the original endpoint get GC'ed... is it possible to have code that takes in a channel, uses clones internally, closes all the clones, and then lets the caller use the channel further? In this case the fact that there used to be clones would be a hidden implementation detail...

    (Of course if we get rid of clone then this issue goes away.)

  • If an endpoint gets GC'ed, and then someone tries to use one of its connected endpoints. It would be better not to deadlock here. I guess for memory channels, this should set some kind of "taint" flag, and then make sure that any operations on a tainted channel do something appropriate, like AssertionError?

    For channels that go over sockets, I guess what will happen in this case is (a) a ResourceWarning (hidden by default, but can be found if you want to check for it), (b) a BrokenResourceError. Because when a socket gets GC'ed, it does a ResourceWarning and then closes the socket. Copying that for memory channels would also be a plausible approach. Though

@njsmith
Copy link
Member Author

njsmith commented May 31, 2019

Another strike against clone: it makes Channel.send_eof semantics substantially more complicated to define/explain.

njsmith added a commit to njsmith/trio that referenced this issue Jun 19, 2019
Make MemorySendChannel and MemoryReceiveChannel into public classes,
and move a number of the methods that used to be on the abstract
SendChannel/ReceiveChannel interfaces into the Memory*Channel concrete
classes. Also add a Channel type, analogous to Stream.

See: python-triogh-719

Still to do:

- Merge MemorySendChannel and MemoryReceiveChannel into a single
  MemoryChannel
- decide what to do about clone
- decide whether to add some kind of half-close on Channel
- refactor this PR's one-off solution to python-triogh-1092 into something more
  general.
njsmith added a commit to njsmith/trio that referenced this issue Jun 19, 2019
Make MemorySendChannel and MemoryReceiveChannel into public classes,
and move a number of the methods that used to be on the abstract
SendChannel/ReceiveChannel interfaces into the Memory*Channel concrete
classes. Also add a Channel type, analogous to Stream.

See: python-triogh-719

Still to do:

- Merge MemorySendChannel and MemoryReceiveChannel into a single
  MemoryChannel
- decide what to do about clone
- decide whether to add some kind of half-close on Channel
- refactor this PR's one-off solution to python-triogh-1092 into something more
  general.
@takluyver
Copy link
Contributor

Should memory channels have a sync close() method? They could easily, they only reason not to is that it makes life simpler for hypothetical IPC channels. I have no idea yet how common those will be, or whether it's actually important for in-process channels and cross-process channels to have compatible APIs.

I'll second @oremanj's yes for this. It seems like a minor wart that you can use a memory channel from sync-coloured code (with the _nowait methods) but you need async-colour to close it. A sync close method would also be nice for cancellation handling - you could close memory channels without having to think about shielding the aclose call.

There are already extra public methods on memory channels that aren't part of the channel ABC, so code written for memory channels won't just work with arbitrary other channels.

@njsmith
Copy link
Member Author

njsmith commented Sep 30, 2020

@lordmauve made another vote for sync-close privately, discussing an IRC-like system where new messages get broadcast to all consumers through a bunch of finite-buffer memory channels, and if a memory channel buffer fills up then that consumer gets booted off:

Hmm, channel can't be close()d, they can only be await aclose()d
That's a problem for my broadcast() function, which is deliberately sync, because I was of the opinion that if broadcasting can block the sender then I'm doing something wrong

belm0 added a commit to belm0/trio that referenced this issue Oct 26, 2020
belm0 added a commit to belm0/trio that referenced this issue Oct 26, 2020
belm0 added a commit to belm0/trio that referenced this issue Oct 26, 2020
belm0 added a commit to belm0/trio that referenced this issue Oct 26, 2020
belm0 added a commit to belm0/trio that referenced this issue Nov 1, 2020
@takluyver
Copy link
Contributor

I've made PR #1797 with a sync .close() method for memory channels, as discussed in this thread.

@pquentin
Copy link
Member

Now that #1797 was merged, what is left to decide here?

@takluyver
Copy link
Contributor

takluyver commented Nov 10, 2020

  • max_buffer_size=0 by default in open_memory_channel()? Still no default at present.
  • Errors on closing a channel with buffered data, so it's harder to accidentally lose data?
    • @njsmith speculated about throwing an error when the last receiver closes.
    • @oremanj suggested a more complex scheme where either the receiver or the sender may error
  • Improving on the bare (send, receive) return tuple for open_memory_channel():
  • Get rid of .clone()?
    • Fair bit of discussion - I won't try to summarise it.
    • Some of the objections are that it's hard to implement - but as things stand, .clone() is part of memory channels, not required for other kinds of channel.
  • Making the channel/stream distinction clearer
  • Should *_nowait methods be on the ABC, instead of only memory channels
  • .poison() or passing an exception to .aclose() as a way for one end to indicate that something has gone wrong, leading to an error in the other end.
  • Should __del__ do something if a channel object goes out of scope without being explicitly closed?
    • It's not always possible to close it from __del__, but it could warn
    • some types of channels (inc. memory channels) could be broken in some way that avoids a deadlock when you try to send/receive on the other side of the channel.

Whew. Did I miss anything?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants