Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Future/Stream implementation for channel #264

Closed
rivertam opened this issue Dec 13, 2018 · 26 comments
Closed

Future/Stream implementation for channel #264

rivertam opened this issue Dec 13, 2018 · 26 comments

Comments

@rivertam
Copy link

I suppose I'm reopening an issue on the archived dedicated channels repo (otherwise I'd just make a comment).

@stjepang mentioned here that it would be easier to reimplement channels from scratch with Future support in mind ahead of time than to add Future support to the extant library.

I have two questions:

  1. Is this on any public roadmap? I don't know nearly enough about async at the moment in Rust to valuably contribute, I don't believe.
  2. If one were to implement a Stream naiively using a crossbeam channel, would it not be efficient? I see an implementation exists as a benchmark, at least for some usages of the channel.

The primary reason I ask this for my own needs is that there appear to be no stable, working MPMC channels that also implement Stream. My application essentially wants a stream of events that occur at a rate of approximately 5 events per millisecond with a couple dozen listeners to that same stream of events. I've created a mini benchmark (not good; no need to share in my opinion but if anyone is curious I can make it public) for using a bunch of futures::sync::mpsc::channels, one for each listener, and publishing on each of those Senders. It seems, given the benchmarks, that that might actually not be terribly inefficient (I can add 100 listeners then publish 10 messages per millisecond for 10 milliseconds in about 12 ms), but I'd rather do things the right way with a Receiver: Clone + Stream, if I can.

@rivertam
Copy link
Author

In an attempt to benchmark a naiive conversion to Stream, I ran into a SendError that I could use help debugging:

#[macro_use]
extern crate criterion;
extern crate crossbeam_channel;
extern crate futures;
extern crate tokio;

use criterion::Criterion;
use crossbeam_channel::bounded;
use futures::future::lazy;
use futures::stream::Stream;
use futures::sync::mpsc::{channel, Receiver, Sender};
use futures::{Async, Poll};
use futures::{Future, Sink};
use std::sync::{Arc, Mutex};
use tokio::timer::Interval;

struct Handler {
    total: Arc<Mutex<u32>>,
}

impl Handler {
    pub fn add_to(&self, num: u32) {
        let mut total = self.total.lock().unwrap();
        *total += num;
    }
}

#[derive(Clone)]
struct FutureReceiver<T> {
    receiver: crossbeam_channel::Receiver<T>,
}

impl<T> Stream for FutureReceiver<T> {
    type Item = T;
    type Error = crossbeam_channel::TryRecvError;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self.receiver.try_recv() {
            Ok(i) => Ok(Async::Ready(Some(i))),
            Err(crossbeam_channel::TryRecvError::Empty) => Ok(Async::NotReady),
            Err(e) => Err(e),
        }
    }
}

fn go_crossbeam() {
    let handler = Arc::new(Handler {
        total: Arc::new(Mutex::new(0)),
    });

    let handler_clone = handler.clone();
    tokio::run(lazy(move || {
        let (tx, rx) = bounded(1_024);
        let future_rx = FutureReceiver { receiver: rx };
        for _ in 0..100 {
            let handler = handler_clone.clone();
            let rx = future_rx.clone();
            tokio::spawn(
                rx.for_each(move |num| -> Result<(), crossbeam_channel::TryRecvError> {
                    handler.add_to(num);
                    Ok(())
                })
                .map_err(|_| ()),
            );
        }

        let tx = Arc::new(tx);
        tokio::spawn(
            Interval::new(
                std::time::Instant::now(),
                std::time::Duration::from_micros(100),
            )
            .take(100)
            .map(|_| 100)
            .map_err(|e| {
                eprintln!("Got interval error = {:?}", e);
            })
            .for_each(move |num| tx.send(num).map_err(|e| {
                println!("send err = {:?}", e));
                match e {
                    SendError
                }
            })
            .map(|_| ()),
        );

        Ok(())
    }));
}

I'm getting a SendError on the tx.send(num) line, which doesn't make a lot of sense to me as I don't believe the Receivers should ever get dropped. Unrelated to the major issue, but would love an explanation.

@ghost
Copy link

ghost commented Dec 13, 2018

Is this on any public roadmap?

Yes, there's a prototype of futures-based channels that take inspiration from crossbeam-channel, but it's in a private repo. The prototype will be completed, though.

If one were to implement a Stream naiively using a crossbeam channel, would it not be efficient? I see an implementation exists as a benchmark, at least for some usages of the channel.

That's a benchmark for the futures-channel crate, which has almost nothing in common with crossbeam-channel.

It should be efficient, but this naive implementation is not complete because we don't wake up sleeping receiver tasks when a message is sent into the channel.

In an attempt to benchmark a naiive conversion to Stream, I ran into a SendError that I could use help debugging:

The problem is that in poll(), if the receiver is not ready, we're not registering the task anywhere for notification. Since the current task is not moved anywhere else, the only reference to it is dropped and this way all receivers get dropped, thus disconnecting the channel.

@rivertam
Copy link
Author

@stjepang Thanks! This explains a lot not just about the issue at hand, but about how Tokio and Futures work in Rust!

In the meantime, I would appreciate some quick advice: Do you think the multiple futures::sync::mpsc implementation with a bunch of Receivers as the "subscribers" being distributed out and the relevant Senders as the "publishers" being kept in a data structure such as a Vec is a valid approach? It's kind of the only way I can think of efficiently and relatively easy writing an event emitter (in this case, the event emitter is not generic and the production occurs on one thread).

@ghost
Copy link

ghost commented Dec 13, 2018

Yes, that sounds like a good idea!

I'm curious: when sending an event message, do you pick one of the Senders randomly, or do you send a clone of the event to each Sender, or something else?

@rivertam
Copy link
Author

In this case, each event is actually a type that is typically encoded by 8 bytes (it's a CAN v1 message), so right now I'm envisioning Clone but I think even Copy would be fair.

However, as the messages are const across all time, I think a reference approach would work if they were bigger.

@ghost
Copy link

ghost commented Dec 13, 2018

So you're actually looking for a broadcast channel (like the bus crate)? A typical MPMC channel delivers each message to only one sender.

@rivertam
Copy link
Author

Ah, yes, I am. Sorry, I forgot the terminologies were different and they meant different things. I hadn't seen the bus crate, but as it doesn't integrate with Futures, I don't know that I'll be able to use it efficiently.

@ghost
Copy link

ghost commented Feb 21, 2019

I believe we can close this. A future-based version of crossbeam-channel is on my todo list.

@ghost ghost closed this as completed Feb 21, 2019
@najamelan
Copy link

@stjpang Has any progress been made for a high perf async channel?

@cynecx
Copy link
Contributor

cynecx commented Aug 14, 2019

@najamelan There are other implementations, which you can already use, like the one included in tokio: https://github.com/tokio-rs/tokio/tree/master/tokio-sync (Though the one included in tokio-sync is 'just' mpsc not mpmc.)

@najamelan
Copy link

@cynecx I didn't know about tokio-sync. I'll check it out soon! Thanks for pointing me to it.

@najamelan
Copy link

A new crate just got published last night: https://crates.io/crates/channel-async

@jnicholls
Copy link

That new crate is a nice, thin layer on top of crossbeam-channel if you ask me. It was almost perfect until I saw the errors use failure instead of just std::error::Error or something like snafu :)

@vulpyne
Copy link

vulpyne commented Aug 26, 2019

@jnicholls It seems like channel-async just runs a loop polling try_recv or try_send in the underlying channel and sleeping if the call fails. That will chew CPU in each "blocked" thread if the sleep delay is low or introduce latency if it's higher.

@jnicholls
Copy link

jnicholls commented Aug 26, 2019

@vulpyne Agreed. It would take a fundamental update to crossbeam-channel to have threads park and wait events signal in order to have the best support for futures. This is why this issue is here, and why that crate is an okay stop-gap until an efficient implementation can be done.

@jnicholls
Copy link

@stjepang Where did this land on your priority list? If there is work underway but unfinished, making it available on a branch for someone else to take up the mantle would be awesome.

@taiki-e
Copy link
Member

taiki-e commented Aug 27, 2019

@ghost
Copy link

ghost commented Oct 25, 2019

There is now a channel implementation in the master branch of async-std, behind the unstable flag:
async-rs/async-std#380

@Thomasdezeeuw
Copy link
Contributor

@stjepang I noticed that the implementation in async-std is not based on Crossbeam, is there any reason for this? Or was it simply not a good fit?

@cynecx
Copy link
Contributor

cynecx commented Oct 26, 2019

@Thomasdezeeuw Actually, having looked at the code (async-rs/async-std#380), it looks like that the same algorithm is being used, however it doesn't really makes sense to reuse crossbeam here, since the notification model is simply different.

@ghost
Copy link

ghost commented Oct 26, 2019

@Thomasdezeeuw Rather than reimplementing literally the whole crate based on futures, I chose to do some things differently:

  • Zero-capacity channels are fundamentally incompatible with the futures model and (AFAIK) there's no way to implement them correctly. They are only possible with stackful coroutines (or regular stackful OS threads).

  • Unbounded channels are totally doable, but sort of an antipattern in CSP-based programming, which is why Go doesn't support them, or why they're the default not the default in futures and tokio.

  • Closing channels when receivers get dropped is not necessary since panics in async tasks crash the whole process and the async runtimes will eventually be able to detect deadlocks (like in Go). This is how channels in Go work too, and it simplifies the API quite a bit.

  • Selection is not necessary because the futures crate offers the select! macro.

  • Timers akin to after() and tick() are not necessary because they're supported well by async runtimes.

The channel in async-std is basically a reimplementation of crossbeam_channel::bounded(), except it doesn't support zero-capacity channels.

@glaebhoerl
Copy link

Zero-capacity channels are fundamentally incompatible with the futures model and (AFAIK) there's no way to implement them correctly. They are only possible with stackful coroutines (or regular stackful OS threads).

Hmm, can you explain why? Haven't encountered this connection before (and two minutes' thinking didn't reveal the answer).

Unbounded channels are totally doable, but sort of an antipattern in CSP-based programming, which is why Go doesn't support them, or why they're the default in futures and tokio.

(This sentence is a bit ambiguous; do you mean futures and tokio are not CSP-based, which is why they're the default there...? Or is there a missing word?)

@ghost
Copy link

ghost commented Oct 27, 2019

Hmm, can you explain why? Haven't encountered this connection before (and two minutes' thinking didn't reveal the answer).

Perhaps my statement was a bit too strong, but it still stands, I think.

Both Go and crossbeam-channel have the important guarantee that exactly one operation (or the default case) in a select block succeeds. I don't know how to make zero-capacity channels work like that inside futures::select. We could make a special select macro that only works with async channels to make that possible, but not with a generic futures-based select macro.

Anyways, I don't know how to explain this well right now and maybe should give it all some more thought. Sorry. :)

(This sentence is a bit ambiguous; do you mean futures and tokio are not CSP-based, which is why they're the default there...? Or is there a missing word?)

Sorry, that was a typo - I wanted to say unbounded channels are not the default :)

So they have constructors called channel() and unbounded_channel(), which implies the former is the default and should be used often, whereas the second is special and should be used more rarely.

Go takes an even stronger stance and intentionally offers no unbounded channels for "philosophical" reasons.

@glaebhoerl
Copy link

Both Go and crossbeam-channel have the important guarantee that exactly one operation (or the default case) in a select block succeeds. I don't know how to make zero-capacity channels work like that inside futures::select.

Sorry for being dense, I'm still not familiar enough with the details to infer the answer here. 😅

Is it something like: positive-capacity channels let you 'rollback' and push the elements from any extra channels back into their buffers in case more than one operation wanted to complete at the same time (vaguely similar to the logic in python-trio/trio#242 (comment))? (If that happens to be near the mark, then why is it harder for futures::select than for crossbeam-channel? With the latter you can block a whole thread, rather than just a "task", but this is significant because...?)

@ghost
Copy link

ghost commented Oct 28, 2019

With the latter you can block a whole thread, rather than just a "task", but this is significant because...?)

Because with threads pairing send-receive operations can really happen simultaneously, whereas this is not possible with tasks.

With tasks, the sending task can "offer" a send to the receiving task, yield with Poll::Pending, and then the receiving side can accept the offer and "commit" the operation by receiving the message, then return `Poll::Ready. But now the sending task can still drop the send future and thus cancel it.

What I'm getting at is, the sending and receiving side can't agree on whether an operation has been executed or was canceled with certainty. With threads, we can do that.

@glaebhoerl
Copy link

Ohh. Interesting... thanks for explaining!

This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

8 participants