Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Design of async channels #212

Closed
ghost opened this issue Sep 18, 2019 · 19 comments · Fixed by #380
Closed

Design of async channels #212

ghost opened this issue Sep 18, 2019 · 19 comments · Fixed by #380
Labels
api design Open design questions

Comments

@ghost
Copy link

ghost commented Sep 18, 2019

It's time to port crossbeam-channel to futures.

Previous discussions:

cc @matklad @BurntSushi @glaebhoerl

Instead of copying crossbeam-channel's API directly, I'm thinking perhaps we should design async channels a bit differently.

In our previous discussions, we figured that dropped receivers should disconnect the channel and make send operations fail for the following reason. If a receiver thread panics, the sending side needs a way to stop producing messages and terminate. If dropped receivers disconnect the channel, the sending side will usually panic due to an attempt of sending a message into the disconnected channel.

In Go, sending into a channel is not a fallible operation even if there are no more receivers. That is because Go only uses bounded channels so they will eventually fill up and the sending side will then block on the channel, attempting to send another message into the channel while it's full. Fortunately, Go's scheduler has a deadlock detection mechanism so it will realize the sending side is deadlocked and will thus make the goroutine fail.

In async-std, we could implement a similar kind of deadlock detection: a task is deadlocked if it's sleeping and there are no more wakers associated with it, or if all tasks are suddenly put to sleep. Therefore, channel disconnection from the receiver side is not such a crucial feature and can simplify the API a lot.

If we were to have only bounded channels and infallible send operations, the API could look like this:

fn new(cap: usize) -> (Sender<T>, Receiver<T>);

struct Sender<T>;
struct Receiver<T>;

impl<T> Sender<T> {
    async fn send(&self, msg: T);
}

impl<T> Receiver<T> {
    fn try_recv(&self) -> Option<T>;
    async fn recv(&self) -> Option<T>;
}

impl<T> Clone for Sender<T> {}
impl<T> Clone for Receiver<T> {}

impl<T> Stream for Receiver<T> {
    type Item = Option<T>;
}

This is a very simple and ergonomic API that is easy to learn.

In our previous discussions, we also had the realization that bounded channels are typically more suitable for CSP-based concurrency models, while unbounded channels are a better fit for actor-based concurrency models. Even futures and tokio expose the mpsc::channel() constructor for bounded channels as the "default" and most ergonomic one, while unbounded channels are discouraged with a more verbose API and are presented in the docs sort of as the type of channel we should reach for in more exceptional situations.

Another benefit of the API as presented above is that it is relatively easy to implement and we could have a working implementation very soon.

As for selection, I can imagine having a select macro similar to the one in the futures crate that could be used as follows (this example is adapted from our a-chat tutorial):

loop {
    select! {
	    msg = rx.recv() => stream.write_all(msg.unwrap().as_bytes()).await?;
	    shutdown.recv() => break,
    }
}

What does everyone think?

@matklad
Copy link
Member

matklad commented Sep 18, 2019

Lgtm! What is the design for bridging sync/async worlds via channels? is this explicitly out of scope? Should the sync side just use block_on?

@ghost
Copy link
Author

ghost commented Sep 18, 2019

Should the sync side just use block_on?

Yes. I guess in theory we could add send_async() and recv_async() methods to crossbeam-channel, but it's complicated and would require a lot of work... :(

@yoshuawuyts
Copy link
Contributor

yoshuawuyts commented Sep 18, 2019

I really like this proposal! Though I think we can do without select! blocks and use stream::join instead:

enum Event<T> {
    Message<T>,
    Shutdown,
}

let events = rx.recv().map(|ev| Event::Message(ev));
let shutdown = shutdown.recv().map(|_| Event::Shutdown);
let s = stream::join!(events, shutdown);

while let Some(ev) = s.next().await {
    match ev {
        Event::Message(msg) => stream.write_all(msg.unwrap().as_bytes()).await?,
        Event::Shutdown => break,
    }
}

@matklad
Copy link
Member

matklad commented Sep 18, 2019

Bridging sync & async seems like a relatively important use-case though. I guess we can later add

impl<T> Sender<T> {
   fn into_blocking(self) -> blocking::Sender<T>
}

I also like yosh's selectless approach to selection, because I think that putting large expressions into macros (what select encourages) is not super ergonomic, because those lazy IDE writers can't properly support code inside macros.

@ghost
Copy link
Author

ghost commented Sep 18, 2019

I'm just a bit sad that the stream::join! version is quite a bit longer than the select! version :(

The Event variants Message and Shutdown appear three times. I wonder if there's a trick we could pull off to simplify that?

@yoshuawuyts
Copy link
Contributor

yoshuawuyts commented Sep 18, 2019

@stjepang using Either or a tuple of options it's quite straight forward to build quick type casts. If we get async iterators the join line can be merged with the Iterator line, at which point they're about equivalent.

This version I showed off is mostly how one could define a version that's suitable for even a public API.

edit with some future lang support it could probably become a bit more concise also:

enum Event<T> {
    Message<T>,
    Shutdown,
}

let events = rx.recv().map(|ev| Event::Message(ev));
let shutdown = shutdown.recv().map(|_| Event::Shutdown);

for match ev.await in stream::join!(events, shutdown) {
    Event::Message(msg) => stream.write_all(msg.into()).await?,
    Event::Shutdown => break,
}

@dignifiedquire
Copy link
Member

dignifiedquire commented Sep 18, 2019

I still find the select macro is still a lot better in terms of distractions vs essence of what is trying to be accomplished, even if my ide doesn‘t format it well.

@alecmocatta
Copy link

alecmocatta commented Sep 19, 2019

Nice work! I really like this simplified API and I happen to have reached similar conclusions about bounded channels and infallible send operations.

I'm out of the loop on if Sink design has progressed, and whether channel capacity is intended to be bounded at n or n + num_senders (I last saw it discussed here rust-lang/futures-rs#800 and here rust-lang/futures-rs#984). Thus these thoughts may be outdated, but in case it's useful here goes!

Sink is missing in your proposed API, and I think for good reason. I found Sink very difficult to implement correctly on channels that don't have cap + num_senders capacity. There's a race between multiple tasks/threads calling poll_ready and start_send that it seems is very hard to resolve without mirroring the design choices of futures-rs's channels. See also rust-lang/futures-rs#1312. I believe there are cleaner and more general possible definitions of Sink, and possible implementations of channels that can impl Sink.

I think this is relevant here because I think it's a closely related design choice, and affects whether try_send, try_send_with, send_with and Sink are implemented (or even implementable) for Sender – and I think they should be (or at least be implementable by the user).

Here's a sketch of a proposed revamped Sink trait to demonstrate:

trait Sink<Item> {
    fn poll_send(
        self: Pin<&mut Self>,
        with: &mut dyn FnMut() -> Item,
        cx: &mut Context<'_>,
    ) -> Poll<()>;
}

This would enable Sender to then implement Sink, and its own methods to be implemented trivially:

struct Sender<T> { ... };
impl<T> Sender<T> {
    fn try_send(&self, t: T) -> Option<T> {
        let mut t = Some(t);
        self.send_with(|| t.take().unwrap()).nonblocking();
        t
    }
    fn try_send_with(&self, f: impl FnOnce() -> T) -> bool {
        self.send_with(f).nonblocking().is_some()
    }
    async fn send(&self, t: T) {
        SinkExt::send(self, t).await
    }
    async fn send_with(&self, f: impl FnOnce() -> T) {
        let mut f = FnWrapper::new(f);
        self.with(|()| f()).send(()).await
    }
}

impl<T> Sink<T> for &Sender<T> {
    fn poll_send(
        self: Pin<&mut Self>,
        with: &mut dyn FnMut() -> T,
        cx: &mut Context<'_>,
    ) -> Poll<()> {
        ...
    }
}
impl<T> Sink<T> for Sender<T> {
    fn poll_send(
        self: Pin<&mut Self>,
        with: &mut dyn FnMut() -> T,
        cx: &mut Context<'_>,
    ) -> Poll<()> {
        Pin::new(&mut &*self).poll_send(with, cx)
    }
}

See here for a more developed, compiling version of this: https://play.rust-lang.org/?version=nightly&edition=2018&gist=4a11fe30cc3ea8d685b42400b88365c5

On another note, should fn try_recv's return value discriminate between "would block" and "disconnected"? i.e.

impl<T> Receiver<T> {
    fn try_recv(&self) -> Result<T, TryRecvError>;
    async fn recv(&self) -> Option<T>;
}

enum TryRecvError {
    Empty,
    Disconnected,
}

And regarding bridging sync/async, I've found the following to be useful:

trait FutureExt: Future {
    fn block(self) -> Self::Output where Self: Sized;
    fn nonblocking(self) -> Option<Self::Output> where Self: Sized;
}
impl<T: ?Sized> FutureExt for T where T: Future {}

this could replace a call to rx.try_recv() with rx.recv().nonblocking(), for example. These are also implemented in https://play.rust-lang.org/?version=nightly&edition=2018&gist=4a11fe30cc3ea8d685b42400b88365c5.

I'm also keen to nail down doing select-like operations with stream::join! over a specialised select! for the time being. select!s that I've seen thus far have been too magical in my opinion – ideally if select! exists at all it should be dead-simple sugar over stream::join!. IMHO.

@ghost
Copy link
Author

ghost commented Sep 20, 2019

On another note, should fn try_recv's return value discriminate between "would block" and "disconnected"?

Instead of returning a Result<T, TryRecvError> from try_recv() like this:

match rx.try_recv() {
    Ok(msg) => println!("message: {}", msg),
    Err(TryRecvError::Disconnected) => println!("disconnected"),
    Err(TryRecvError::Empty) => println!("empty"),
}

we could also take the approach from crossbeam-channel v0.2:

select! {
    msg = rx.recv() => match msg {
        Some(msg) => println!("message: {}", msg),
        None => println!("disconnected"),
    }
    default => println!("empty"),
}

Go doesn't even have a try_recv() equivalent, you always need to use the select construct :)

@yoshuawuyts
Copy link
Contributor

yoshuawuyts commented Sep 20, 2019

@stjepang ohh, I like that first variant a lot!

@alecmocatta thanks for the detailed writeup. We've observed similar problems with the Sink trait also. Our current position is that since there's not really a counterpart to it in the stdlib, we don't necessarily want to go out and commit to a new trait. But I think if we did, the design you've laid out there would be pretty close to what we want I think!

Perhaps it may be worthwhile to spin it out into its own crate so we can revisit this later? It seems like having a general interface for writing data beyond just Write is something that people have been thinking about for a while now.

@BurntSushi
Copy link

I think the proposed API looks good to me! I appreciate the small API surface area.

@siriux
Copy link

siriux commented Sep 24, 2019

Are you planning to introduce optimized oneshot channels?

It's an important use case that crossbeam doesn't handle very well now.
crossbeam-rs/crossbeam#199

As oneshot and normal channels have different use cases, maybe async-std could provide a separated api/type for optimized oneshot channels if it's not possible to make them as fast as mpsc with the current design. And this might be even more ergonomic for this use case.

What do you think?

@BurntSushi
Copy link

Oneshot channels with the fastest possible performance seem like a pretty niche use case. Why not prototype them in a separate crate first?

@yoshuawuyts
Copy link
Contributor

As a note aside, another interesting kind of channel is MPMC broadcasters, where each message sent is delivered to all receivers (not just the first). The broadcast crate provides an implementation for this that's used in #240.

use broadcaster::BroadcastChannel;

let mut chan = BroadcastChannel::new();
chan.send(&5i32).await?;
assert_eq!(chan.recv().await, Some(5));

let mut chan2 = chan.clone();
chan2.send(&6i32).await?;
assert_eq!(chan.recv().await, Some(6));
assert_eq!(chan2.recv().await, Some(6));

This reminds me a lot of Node.js's EventEmitter API. Because we have different, more specialized channels available already I think we already cover a lot of the cases it's used in. But for fully reactive systems being able to broadcast is very useful. An example of this is Choo's store abstraction.

I think after the initial channels are done, it may be interesting to have an "unstable" broadcaster variant available too.

@thedodd
Copy link
Contributor

thedodd commented Oct 15, 2019

@yoshuawuyts I'm glad you brought up the MPMC use case. That is definitely one area which still needs some love in the ecosystem, IMHO. I've had a lot of good experiences with https://github.com/ReactiveCocoa/ReactiveSwift building iOS apps and such. Rx/Reactive patterns are extremely useful when building large event-driven systems (data stores, apps &c); however, without MPMC/SPMC, it is difficult to implement some of these patterns.

@ghost
Copy link
Author

ghost commented Nov 1, 2019

Related follow-up thread: #436

@agausmann
Copy link

agausmann commented Nov 28, 2019

I'm skeptical that we'd want send to be infallible. Being able to detect that the channel has closed and explicitly handling that condition is useful in many programs.

As an example, I tried to use the unstable channels to implement a naive version of a broadcast queue. It uses a single async_std::sync::channel channel per receiver and the public Sender has a Vec<async_std::sync::Sender> for the individual senders.

This is built with the intention of having many receivers being constantly added and removed over time as subscriptions are only temporarily needed. Thus, it is important to prune the collection when receivers stop subscribing. With a fallible send, I could intuitively do this while sending without requiring any extra channels to indicate shutdown:

pub async fn send(&mut self, data: T)
where
    T: Clone,
{
    self.receivers.retain(|data_tx| data_tx.send(data.clone()).await.is_ok())
}

but this is not possible with the current API.

@NyxCode
Copy link

NyxCode commented Feb 17, 2020

I 100% agree with @agausmann! I just build something using async_std::sync::channel, and then ported it to futures::channel::mpsc, because I need to know if send succeeded. That's why I searched for this issue, to let you know, that for my usecase (which seems quite straight-forward) async_std::sync::channel is unfortunately absolutely useless.
If some people, for inexplicable reasons, dont care if send succeeded or not, they can still do a let _ = tx.send()..

@yoshuawuyts
Copy link
Contributor

There is an open PR addressing the need for fallible senders / receivers - #585.

And for a different use, sending messages to all active receivers, there's an open issue as well: #486.

This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api design Open design questions
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants