diff --git a/text/2017-11-09-channel.md b/text/2017-11-09-channel.md new file mode 100644 index 0000000..c888172 --- /dev/null +++ b/text/2017-11-09-channel.md @@ -0,0 +1,973 @@ +# Summary + +Introduce crate `crossbeam-channel`, which aims to be an upgrade over +`std::sync::mpsc` from the standard library in pretty much all aspects: +features, convenience, and performance. + +The prototype implementation can be found [here](https://github.com/stjepang/channel). +Also, check out [benchmarks](https://github.com/stjepang/channel/tree/master/benchmarks) +to see how it fares against `std::sync::mpsc` and other crates offering +different kinds of channels and queues. + +# Motivation + +The design of the new channel is driven by an attempt to answer three questions: + +1. How to fix the shortcomings of `std::sync::mpsc`? +2. How to achieve the flexibility of Go's channels, and especially its *select*? +3. How to consolidate the variety of Java's concurrent queues into a unified channel interface? + +## Shortcomings of `std::sync::mpsc` + +I've compiled a list of some of the problems our fellow Rust users have been complaining about: + +1. StackOverflow user [asking](https://stackoverflow.com/questions/40384274/rust-mpscsender-cannot-be-shared-between-threads) + why [`Sender`](https://doc.rust-lang.org/std/sync/mpsc/struct.Sender.html) + doesn't implement `Sync`. +2. In Rust-related channels on irc.mozilla.org, people regularly + come in asking for [MPMC](https://botbot.me/mozilla/rust/search/?q=mpmc) or + [SPMC](https://botbot.me/mozilla/rust/search/?q=spmc) channels and queues. +3. A Reddit user is [asking](https://www.reddit.com/r/rust/comments/6remch/thoughts_on_avoiding_busy_waiting_for_threads/) + for help with multithreading. Complains about "pretty confusing" API and "apparently mediocre" + implementation of [`Select`](https://doc.rust-lang.org/nightly/std/sync/mpsc/struct.Select.html). + There is also some talk about MPMC channels. +4. Another Reddit user [wants](https://www.reddit.com/r/rust/comments/3mpke2/does_rust_has_something_like_channels_in_go/cvhs5kk/) + SPMC channels. The original post is asking about Go-like channels in Rust. +5. An annoyance/wart [reported](https://github.com/rust-lang/rust/issues/12902) + on Rust's issue tracker: The `select!` macro doesn't work if you try to reference + a `Receiver` in a struct. +6. `Sender` not being `Sync` is a common stumbling block for users of Hyper, as + [explained](https://github.com/rust-lang/rfcs/pull/1299#issuecomment-146361678) + by [**@seanmonstar**](https://github.com/seanmonstar). +7. [**@jonhoo**](https://github.com/jonhoo) + [points out](https://github.com/rust-lang/rust/issues/27800#issuecomment-313455851) + that channel selection in Rust is not fair like in Go. The list of cases + in a `select!` should be randomly permuted each time. +8. [**@alexcrichton**](https://github.com/alexcrichton) + [says](https://github.com/rust-lang/rust/issues/12902#issuecomment-319510050) + the `select!` macro is defacto deprecated nowadays. It's probably alive only + because Servo is still using it. +9. Judging by upvotes given to a [comment](https://github.com/rust-lang/rust/issues/27800#issuecomment-140966321) + on GitHub, people really really want Go-like channels with easy selection. +10. A [plea](https://github.com/rust-lang/rust/issues/27800#issuecomment-270583295) + for the ability to select across dynamic lists of channels. +11. The selection interface [doesn't support](https://github.com/rust-lang/rust/issues/27800#issuecomment-300880631) + specifying a timeout. +12. A Rocket user [wants](https://www.reddit.com/r/rust/comments/6t8sdl/help_communicating_bidirectionally_with_rocket_in/) + to pass a `Receiver` to `.manage(...)`, but this doesn't work because `Receiver` is not `Sync`. +13. [**@alexcrichton**](https://github.com/alexcrichton) + [lays out](https://github.com/rust-lang/rust/pull/42397#issuecomment-315867774) + a list of regrets about the current design of `std::sync::mpsc`. +14. Since `Receiver` is not `Sync`, people [work around](https://github.com/pingcap/tikv/pull/637/files) + the problem by wrapping it in a `Mutex`; for example: `Arc>>>`. +15. Yet another [example](https://github.com/servo/servo/blob/38f4ae80c4b456b89ee33543c8c6699501696c9c/components/script/dom/paintworkletglobalscope.rs#L234) + in Servo: wrapping `WorkletExecutor` in a `Mutex` because `Sender` is not `Sync`. +16. [**@jdm**](https://github.com/jdm) + [says](https://mozilla.logbot.info/servo/20170719#c716258) + having a `.len()` method on channels would be useful for profiling Servo. +17. In an effort to get rid of unstable features, Servo is + [looking for](https://github.com/servo/servo/issues/5286) + an alternative to `mpsc_select`. +18. [`ipc-channel`](https://github.com/servo/ipc-channel) + depends on `mpsc_select`, so it + [cannot be used](https://github.com/servo/ipc-channel/issues/118) + with stable Rust. +19. [Rust severely disappoints](http://esr.ibiblio.org/?p=7294) ESR: + *Not only that, but it seems the CSP implementation in Rust – the + most tractable concurrency primitive – has limits that make it unusable + for NTPsec’s purposes (only selection over a static set of channels is possible) + and there is some danger that it might be removed entirely!* + +I also wrote a lengthy critique on `std::sync::mpsc` in a +[blog post](https://stjepang.github.io/2017/08/13/designing-a-channel.html), +with suggestions for how to design a new, better channel. + +Here's a summary of all the shortcomings of `std::sync::mpsc`: + +1. Channels are not MPMC (they don't support multiple `Receiver`s). +2. The bounded variant (`sync_channel`) is slow and scales poorly due to internal lock contention. +3. `Sender`s and `Receiver`s are not `Sync`. +4. There is no `send_timeout` method (useful when sending data into a full bounded channel). +5. [`Sender`](https://doc.rust-lang.org/std/sync/mpsc/struct.Sender.html) + and [`SyncSender`](https://doc.rust-lang.org/std/sync/mpsc/struct.SyncSender.html) + are distinct types, which duplicates the interface somewhat. +6. `select!` can only select over *receive* operations, but not over *send* operations. +7. `select!` does not have a clear path towards stabilization. +8. Dynamic selection (using [`Select`](https://doc.rust-lang.org/nightly/std/sync/mpsc/struct.Select.html)) + is unsafe and unergonomic, as is evident by + [code](https://github.com/servo/servo/blob/fa319170ebb34afcdfc120b7c3e47fe5b1c21210/components/script/script_thread.rs#L930) + in Servo. +9. Channel selection is not fair (select cases are always fired in the same order). +10. The `select!` interface comes with some surprising warts. +11. It would be nice to have `len` method on `Sender`s and `Receiver`s. + +So far, the most popular brand of band-aid for patching some of these problems has been +the [chan](https://github.com/BurntSushi/chan) crate, which comes with MPMC channels. +Unfortunately, it also brings some problems of its own, poor performance being the +most pronounced one. + +## A look at Go's channels + +Channels and selection are the core building blocks of Go programs, and a huge +reason for the success of language. + +Go's channels come in two flavors: unbuffered (zero capacity) and buffered (positive, +fixed capacity) channels. There are no unbounded channels built in. + +Sometimes they get [criticized](http://www.jtolds.com/writing/2016/03/go-channels-are-bad-and-you-should-feel-bad/) +for being slow, which is probably a bit too harsh. If we +[compare](https://github.com/stjepang/channel/tree/master/benchmarks) +them to Rust's `std::sync::mpsc`, they actually perform quite well. + +Where Go really shines, however, is the versatility of *select*: + +```go +// Select over two operations on a zero-capacity channel named `match`. +select { + +// Receiving message `peer` from channel `match`. +case peer := <-match: + fmt.Printf("%s received a message from %s.\n", name, peer) + +// Sending message `name` into channel `match`. +case match <- name: + fmt.Printf("Someone has matched with %s.\n", name) +} +``` + +Here we are declaring a receive and a send operation, and blocking until +exactly one of them succeeds. This is something `std::sync::mpsc` simply +cannot do. In fact, it's difficult to find a language (or a library) +that does offer something equivalent. + +On the other hand, dynamic selection (declaring select cases dynamically +rather than statically) can be [a bit tricky](https://stackoverflow.com/questions/19992334/how-to-listen-to-n-channels-dynamic-select-statement) +to do (and probably slow), but Go's *select* is already a high bar to meet. + +There has been some effort by [Dmitry Vyukov](http://www.1024cores.net/) to improve the performance of Go's +channels by avoiding contention on mutexes (often referred to as *lock-free* channels, +although they aren't exactly, [strictly speaking](https://en.wikipedia.org/wiki/Non-blocking_algorithm#Lock-freedom)): + +* (2013/08 - present) [Implementation](https://codereview.appspot.com/12544043/) submitted for review. +* (2014/02) Initial [proposal](https://groups.google.com/forum/#!msg/golang-dev/5zcEng3yvaU/wfosgbaKuEgJ) for Go 1.3. +* (2014/02) The design [document](https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub). +* (2014/07) An [update](https://groups.google.com/forum/#!searchin/golang-dev/lock-free|sort:relevance/golang-dev/0IElw_BbTrk/FtV82Tr3S00J). +* (2014/10) The [issue](https://github.com/golang/go/issues/8899) on GitHub. +* (2016/03) [Discussed](https://groups.google.com/forum/#!searchin/golang-nuts/lock-free|sort:relevance/golang-nuts/LM648yrPpck/o1rR6AUhAwAJ) + as a possible answer to the aforementioned [critique](http://www.jtolds.com/writing/2016/03/go-channels-are-bad-and-you-should-feel-bad/). +* (2017/04) Another [update](https://github.com/golang/go/issues/8899#issuecomment-292491545). +* (2017/05) [Brought up](https://github.com/golang/go/issues/20351#issuecomment-301895677) again as a possible solution to performance regressions in Go 1.8. + +This new implementation has been in a limbo state for several years already. It never +fully materialized, but it was also neither abandoned nor given a red light. +It might still end up in Go's mainline at some point in the future. We'll see. + +## A look at Java's concurrent queues + +Java has a quite extensive list of data structures in [`java.util.concurrent`](https://docs.oracle.com/javase/8/docs/api/index.html?java/util/concurrent/package-summary.html). +We're aspiring to have many of those implemented in Rust as part of the Crossbeam project. + +Java's concurrent queues: + +1. [`ArrayBlockingQueue`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ArrayBlockingQueue.html) + is a bounded blocking queue backed by an array. This is similar to `std::sync::mpsc::sync_channel`. +2. [`ConcurrentLinkedQueue`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html) + is an unbounded queue backed by a linked list. This is similar to `std::sync::mpsc::channel`. +3. [`DelayQueue`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/DelayQueue.html) + is an unbounded blocking queue of delayed elements. An element can only be taken after + its delay has expired. This is a special kind of priority queue. +4. [`LinkedBlockingQueue`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingQueue.html) + is an optionally-bounded blocking queue based on a linked list. In bounded form, it's + rarely useful because `ArrayBlockingQueue` is probably a better choice anyway. +5. [`LinkedTransferQueue`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedTransferQueue.html) + is similar to `ConcurrentLinkedQueue`, except it also has the ability to block on + consumers. +6. [`PriorityBlockingQueue`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/PriorityBlockingQueue.html) + is an unbounded blocking priority queue. This is just a thin layer on top of a + typical priority queue. +7. [`SynchronousQueue`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/SynchronousQueue.html) + is a zero-capacity (also known as *rendezvous*) queue. This is equivalent to + `std::sync::mpmsc::sync_channel(0)`. + +Rust's `std::sync::mpsc` already encompasses the feature sets of (1), (2), (5), and (7), +except for the ability to have multiple consumers and things that don't fit Rust's +ownership model. For example, all Java's concurrent queues have the `peek()` method, +which returns a reference to an element but doesn't remove it from the queue. This +is obviously a red flag in the Rust world. + +Queues (3) and (6) are a bit special and depart from the domain of typical channels. +Queue (4) is not difficult to implement, but at the same time, it also doesn't seem +particularly useful. Therefore, those three queues are not the focus of `crossbeam-channel`. + +# Detailed design + +## Design goals + +All shortcomings of `std::sync::mpsc` enumerated in the *Motivation* section must be fixed. +`crossbeam-channel` strives to look simple and stupid on the outside, +but be ambitiously powerful inside. The channel will: + +1. Be an MPMC channel (allow multiple `Sender`s and multiple `Receiver`s). +2. Avoid locks and be much faster than `std::sync::mpsc` in the bounded case (the `sync_channel` case). +3. Not be significantly slower than `std::sync::mpsc` in the unbounded case. +4. Have the full spectrum of `try_send`/`send`/`send_timeout`/`try_recv`/`recv`/`recv_timeout` methods. +5. Have convenience methods like `capacity`, `len`, and `is_empty`. +6. Have `Senders` and `Receivers` that implement `Sync`. +7. Have a simple interface without special types like `SyncSender`. +8. Support selection over both send and receive operations. +9. Allow selection with a timeout. +10. Have easy and safe dynamic selection. +11. Have selection without the warts of `select!`. +12. Have fair(er) selection. + +## The interface + +Constructors: + +```rust +pub fn unbounded() -> (Sender, Receiver); +pub fn bounded(cap: usize) -> (Sender, Receiver); +``` + +Sender: + +```rust +pub struct Sender { ... } + +unsafe impl Send for Sender {} +unsafe impl Sync for Sender {} + +impl Sender { + pub fn try_send(&self, value: T) -> Result<(), TrySendError>; + pub fn send(&self, value: T) -> Result<(), SendError>; + pub fn send_timeout(&self, value: T, dur: Duration) -> Result<(), SendTimeoutError>; + + pub fn is_empty(&self) -> bool; + pub fn len(&self) -> usize; + pub fn capacity(&self) -> Option; + pub fn is_disconnected(&self) -> bool; +} + +impl Clone for Sender { ... } +``` + +Receiver: + +```rust +pub struct Receiver { ... } + +unsafe impl Send for Receiver {} +unsafe impl Sync for Receiver {} + +impl Receiver { + pub fn try_recv(&self) -> Result; + pub fn recv(&self) -> Result; + pub fn recv_timeout(&self, dur: Duration) -> Result; + + pub fn is_empty(&self) -> bool; + pub fn len(&self) -> usize; + pub fn capacity(&self) -> Option; + pub fn is_disconnected(&self) -> bool; + + pub fn iter(&self) -> Iter; + pub fn try_iter(&self) -> TryIter; +} + +impl Clone for Receiver { ... } +``` + +Iterators: + +```rust +impl<'a, T> IntoIterator for &'a Receiver { ... } +impl IntoIterator for Receiver { ... } + +pub struct Iter<'a, T: 'a> { ... } +impl<'a, T> Iterator for Iter<'a, T> { ... } + +pub struct TryIter<'a, T: 'a> { ... } +impl<'a, T> Iterator for TryIter<'a, T> { ... } + +pub struct IntoIter { ... } +impl Iterator for IntoIter { ... } +``` + +Selection: + +```rust +impl Select { + pub fn send(&mut self, tx: &Sender, value: T) -> Result<(), SelectSendError>; + pub fn recv(&mut self, rx: &Receiver) -> Result; + + pub fn disconnected(&mut self) -> bool; + pub fn would_block(&mut self) -> bool; + pub fn timed_out(&mut self) -> bool; +} +``` + +Error types are the same as in `std::sync::mpsc`, so their definitions are +omitted here for brevity. Here's a full list of those error types: + +``` +RecvError +RecvTimeoutError +TryRecvError +SendError +SendTimeoutError +TrySendError +``` + +There are two additional error types used in `Select::send` and `Select::recv`: + +```rust +pub struct SelectRecvError; + +pub struct SelectSendError(pub T); + +impl SelectSendError { + pub fn into_inner(self) -> T; +} +``` + +## The three flavors + +Internally, there are three flavors (implementations) of channels. + +1. Constructor `unbounded()` creates a channel of list-based flavor. +2. Constructor `bounded(cap)` creates a channel of array-based flavor when `cap > 0`. +3. Constructor `bounded(cap)` creates a channel of zero-capacity flavor when `cap == 0`. + +Each of the flavors is a completely independent channel implementation and is optimized +for its own use case. In theory, it would be possible to have fewer flavors (or just one), +but at the cost of worse performance. + +More concretely, flavors look like this behind the scenes: + +```rust +struct Channel { + senders: AtomicUsize, + receivers: AtomicUsize, + flavor: Flavor, +} + +enum Flavor { + Array(flavors::array::Channel), + List(flavors::list::Channel), + Zero(flavors::zero::Channel), +} +``` + +`Sender`s and `Receiver`s are just wrappers around `Channel`: + +```rust +pub struct Sender(Arc>); +pub struct Receiver(Arc>); +``` + +To try sending a value into the channel, `Sender` provides method `try_send`, which simply +matches over the three flavors at runtime and calls a method in the appropriate one: + +```rust +impl Sender { + pub fn try_send(&self, value: T) -> Result<(), TrySendError> { + match self.0.flavor { + Flavor::Array(ref chan) => chan.try_send(value), + Flavor::List(ref chan) => chan.try_send(value), + Flavor::Zero(ref chan) => chan.try_send(value, self.case_id()), + } + } +} +``` + +All other methods on `Sender`s and `Receiver`s work very similarly. + +As a side note, `std::sync::mpsc` also uses the same idea of matching +over different flavors at runtime. It actually has four flavors: + +```rust +enum Flavor { + Oneshot(Arc>), + Stream(Arc>), + Shared(Arc>), + Sync(Arc>), +} +``` + +### List-based flavor + +This flavor implements an unbounded channel. + +I experimented with three different queues here: + +1. Michael-Scott queue. +2. A modification of Dmitry Vyukov's [MPSC queue](http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue) that supports MPMC. +3. A queue that allocates nodes in blocks (similar to [`SegQueue`](https://docs.rs/crossbeam/0.3.0/crossbeam/sync/struct.SegQueue.html)). + +Implementations (1) and (2) allocate too frequently (one allocation per +message), which affects performance negatively. Implementation (3) +has shown better numbers in benchmarks, so I went with that one. + +The actual code is very similar to the one in [`SegQueue`](https://docs.rs/crossbeam/0.3.0/crossbeam/sync/struct.SegQueue.html). +It's worth pointing out that the algorithm is not exactly lock-free, but it +scales well nonetheless. This is not too bad, especially considering that +`std::sync::mpsc` is not lock-free either. + +It would be possible to make it lock-free by choosing a different queue, though. +Michael-Scott queue is lock-free, and so is +[FAAArrayQueue](http://concurrencyfreaks.blogspot.hr/2016/11/faaarrayqueue-mpmc-lock-free-queue-part.html), +which allocates nodes in blocks and is most probably the best choice here. +However, FAAArrayQueue is quite a bit more difficult to implement, so +I didn't bother with it, at least not for now. But I'm still leaving +on the table the possibility of implementing it in the future. + +Note that since allocations occur in blocks (32 slots per block), +if a channel of this flavor transfers very few messages, +more memory will be allocated than is actually necessary. +That is especially true if the channel is used as a *oneshot channel* +(transfers a single message). + +Memory reclamation relies on Crossbeam's new epoch-based GC +([`crossbeam-epoch`](https://github.com/crossbeam-rs/crossbeam-epoch)). +We could use other schemes - in particular, hazard pointers would be +a good alternative, possibly even a better one. + +### Array-based flavor + +This flavor implements a bounded channel of positive capacity. + +The implementation is based on Dmitry Vyukov's well-known +[bounded MPMC queue](http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue), +which is very fast and relatively simple to implement. + +The queue allocates a fixed-capacity buffer of type `[(AtomicUsize, T)]`, which +means there is an overhead of `mem::size_of::() * capacity` bytes. + +An interesting property of Dmitry Vyukov's original implementation +is that the queue is not linearizable. For example, the original implementation +fails on the following test: + +```rust +#[test] +fn linearizable() { + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let (tx, rx) = bounded(THREADS); + + crossbeam::scope(|s| { + for _ in 0..THREADS { + s.spawn(|| { + for _ in 0..COUNT { + tx.send(0).unwrap(); + rx.try_recv().unwrap(); + } + }); + } + }); +} +``` + +The test fails because `rx.try_recv()` sometimes returns `Err(TryRecvError::Empty)`. +The problem is in the `dequeue` function: if `dif < 0`, then `false` is +returned immediately: + +```cpp +if (dif == 0) +{ + if (dequeue_pos_.compare_exchange_weak + (pos, pos + 1, std::memory_order_relaxed)) + break; +} +else if (dif < 0) + return false; +else + pos = dequeue_pos_.load(std::memory_order_relaxed); +``` + +Even if `cell->sequence_` might be lagging behind `dequeue_pos_` (meaning `dif < 0`), +this does not necessarily imply that the queue is empty. It might be the case that +there is an ongoing enqueue operation: `enqueue_pos_` was incremented, but the +corresponding `cell->sequence_` was not updated yet. + +Fortunately, the problem is easy to fix. We just need an additional check that +compares the current values of `dequeue_pos_` and `enqueue_pos_`. If those values +match, then the queue is indeed empty. A similar fix should be applied to the +`enqueue` function as well. + +After applying those small tweaks, the queue passes the test. + +### Zero-capacity flavor + +This flavor implements a zero-capacity channel, also known as *rendezvous* channel. + +[Go](https://github.com/golang/go/blob/1125fae989a3016d509c23fee15793b231e5e8e1/src/runtime/chan.go) +and [`std::sync::mpsc`](https://github.com/rust-lang/rust/blob/49bee9d09a8f8c2baf4aff7d6a46cebff0c64594/src/libstd/sync/mpsc/sync.rs) +don't have dedicated implementations for the zero-capacity +variant - it is simply a special case within the array-based (bounded) flavor. + +Due to a completely different implementation of the array-based flavor, +`crossbeam-channel` cannot take the same approach. The zero-capacity flavor +has to be written from scratch. + +It's interesting to view zero-capacity channels as a variation on Java's +[`Exchanger`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Exchanger.html). +The crucial difference between those is that an `Exchanger` allows any +participating threads to pair up and exchange any data, while channels are +a bit more restrictive. A zero-capacity channel has two sides and threads +on the sending side always exchange a `Some(data)` for a `None` coming from +the receiving side. + +The implementation itself is quite complicated and boring, so I won't go into +the hairy details here. But I'll just mention that it is internally +really represented as a special, two-sided `Exchanger`. + +## Blocking operations + +Implementing concurrent queues is not easy, but adding support for blocking +operations and selection on top of them is even more challenging. Especially so +if we want to preserve good performance. + +The basis for blocking and selection in `crossbeam-channel` is Dmitry Vyukov's +[design document](https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub) +for faster channels in Go. + +In the document, blocking send operation is implemented as follows: + +```c +void asyncchansend(Hchan* c, T val) { + for(;;) { + if(asyncchansend_nonblock(c, val)) { + // Send succeeded, see if we need to unblock a receiver. + if(c->recvq != nil) { + lock(c); + sg = removewaiter(&c->recvq); + unlock(c); + if(sg != nil) + unblock(sg->g); + } + return; + } else { + // The channel is full. + lock(c); + sg->g = g; + addwaiter(&c->sendq, sg); + if(notfull(c)) { + removewaiter(&c->sendq, sg); + unlock(c); + continue; + } + unlock(c); + block(); + // Retry send. + } + } +} +``` + +Note that the whole operation is implemented as a loop, in which +the send operation is possibly attempted multiple times. + +A channel has two wait lists: one for blocked senders and one for blocked receivers. +When a send or receive operation succeeds, it removes one blocked receiver or sender +(respectively) from the wait list and wakes it up. + +In `crossbeam-channel`, blocking send and receive operations work very similarly: + +1. Check whether the channel is closed. If so, return. +2. Try performing the operation. If succeeded, return. +3. Add this thread to the wait list. +4. Check if the operation may succeed now. If so, skip to step (6). +5. Park this thread (block) until someone unparks it (wakes it up). +6. Remove this thread from the wait list. +7. If timed out, return. +8. Go to step (1). + +Consider what happens when a receive operation blocks, and then another thread +sends a message. The sender will push the message into the queue and wake up +the receiver, which then attempts to pop a message from the queue again. + +Note that Go works a bit differently. +Go's channel would not push the message into the queue. Instead, it +would pass the message directly to the sleeping receiver and then wake it up. +The receiver would simply pick up the message when removing itself from the +wait list. There would be no looping nor retrying involved. + +While the non-looping mechanism might seem more efficient, it doesn't work well +with concurrent queues. Go can get away with it because its queue relies on +heavy locking. + +## Selection + +Selection is, in fact, similar to blocking send and receive operations, except +it appears in a more general form. As the +[*Designing a channel*](https://stjepang.github.io/2017/08/13/designing-a-channel.html) +blog post explains, there exists a nice solution for fully general select that: + +1. Supports both send and receive operations. +2. Allows specifying timeouts. +3. Does not use macros. +4. Has a completely safe API. +5. Can have dynamic lists of cases. + +The idea is that we expose the *looping guts* of the blocking mechanism. The user +declares a loop and enlists selection cases within the loop. The loop is broken +as soon as any of the cases succeed. + +For example: + +```rust +// Select over two operations on a zero-capacity channel `(tx, rx)`. +let mut sel = Select::new(); +loop { + // Receiving message `peer` from the channel. + if let Ok(peer) = sel.recv(&rx) { + // Success! A message was received. + println!("{} received a message from {}.\n", name, peer); + break; + } + + // Sending message `name` into the channel. + if let Err(err) = sel.send(&tx, name) { + // Sending failed. Regain ownership of the message. + name = err.into_inner(); + } else { + // Success! The message was sent. + break; + } +} +``` + +Each call to `sel.{send,recv}` (either with a `Sender` or a `Receiver`) accesses +a state machine within `sel`. The loop executes as follows: + +1. The machine is uninitialized in the beginning. +2. The first call to `sel.{send,recv}` activates the machine. It transitions into the + *counting* state and remembers the first selection case by the `Sender` + or `Receiver` ID. +3. Each call to `sel.{send,recv}` simply increments a case counter. +4. When the machine notices a repeated call to `sel.{send,recv}` on the first + selection case, it transitions into the *attempt* state. Also, it + chooses a random number between 0 and the number of cases - that is the + `start` index. +5. In the following iteration of the loop, all calls to `sel.{send,recv}` + on cases with index less than `start` simply fail. On all other cases, + a send or receive operation is attempted. +6. The following iteration of the loop is similar to (5), except this time + only cases with indices less than `start` are attempted. +7. The following iteration of the loop adds each case to its corresponding + wait list. It also checks whether any of the cases may succeed + just after adding to the wait list. +8. If any of the cases may now possibly succeed, we skip to (10). +9. The current thread is parked until someone wakes it up (or we time out). +10. The following iteration removes each case from its corresponding wait list. +11. Go to step (5). + +When any of the cases succeeds, the state machine becomes uninitialized, and that +is when the loop must be broken. The loop is broken at steps (5) or (6). + +There are also several special selection cases: + +```rust +impl Select { + pub fn disconnected(&mut self) -> bool; + pub fn would_block(&mut self) -> bool; + pub fn timed_out(&mut self) -> bool; +} +``` + +These are used very similarly to `sel.{send,recv}`. Here's an example: + +```rust +let mut sel = Select::new(); +loop { + if let Ok(msg) = sel.recv(&rx1) { + println!("{}", msg); + break; + } + if let Ok(msg) = sel.recv(&rx2) { + println!("{}", msg); + break; + } + if sel.disconnected() { + // Activated if all send/recv operations fail with `Disconnected` error. + break; + } + if sel.would_block() { + // Activated if all send/recv cases would block. + break; + } +} +``` + +Another example: + +```rust +let mut sel = Select::with_timeout(Duration::from_millis(100)); +loop { + if let Ok(msg) = sel.recv(&rx1) { + println!("{}", msg); + break; + } + if let Ok(msg) = sel.recv(&rx2) { + println!("{}", msg); + break; + } + if sel.timed_out() { + // Activated if 100 milliseconds have passed from the beginning of selection. + break; + } +} +``` + +There are important rules one must follow in order to use selection correctly: + +1. Each iteration of the loop must fire the same set of cases in the same order. +2. No two cases may use the same end of the same channel. +3. As soon as a case succeeds, the loop must be broken. + +Violating these rules will result in misbehaving selection, possibly causing +a deadlock. + +Finally, let's try dynamic selection over all receivers stored in a vector: + +```rust +let receivers: Vec> = ...; + +let mut sel = Select::new(); +let msg = 'select: loop { + for rx in &receivers { + if let Ok(msg) = sel.recv(rx) { + break 'select msg; + } + } +}; + +println!("Received message: {}", msg); +``` + +In each iteration of the `'select` loop, a case is fired for each `Receiver` +in the vector. As soon as a message is received from one of them, the loop is +broken with the message as the result. + +## Fairness + +### Fair selection + +The Go [language specification](https://golang.org/ref/spec#Select_statements) +explains the workings of its `select` statement. It's interesting how the successful +case is chosen if more than one case can proceed at the same time: + +> If one or more of the communications can proceed, a single one +> that can proceed is chosen via a uniform pseudo-random selection. +> Otherwise, if there is a default case, that case is chosen. +> If there is no default case, the "select" statement blocks until at +> least one of the communications can proceed. + +This bit of randomness makes sure that if a `select` statement is executed many +times in a row, no case will have preferential treatment over others. + +Unfortunately, the `select!` macro in Rust doesn't have similar guarantees - if the first +case can proceed immediately, it will be the successful case. This is unfair to +other cases declared below it. + +`crossbeam-channel` deals with the problem by randomly rotating the circular +list of cases each time, and then proceeding as if the first case had +preferential treatment. Note that this doesn't make selection perfectly fair. +For example, if there are three cases and only the first two can proceed, +then the first case has higher chances of succeeding. + +### The *drive-by* problem + +In 2015, Russ Cox opened a GitHub [issue](https://github.com/golang/go/issues/11506) +on how blocked operations get (re)ordered within the wait list (this was +an old implementation of channels in Go): + +> A send into a buffered channel with blocked receivers stores the value in the buffer, +> wakes up a receiver, and continues executing. When the receiver is eventually scheduled, +> it checks the channel, and maybe it gets lucky and the value is still there. +> But maybe not, in which case it goes to the end of the queue. +> +> A receive out of a buffered channel copies a value out of the buffer, +> wakes a blocked sender, and continues. When the sender is eventually scheduled, +> it checks the channel, and maybe it gets lucky and there is still room +> in the buffer for the send. But maybe not, in which case it goes to the +> end of the queue. + +This is pretty much how blocking in `crossbeam-channel` works. + +He argues that this behavior is bad and, in theory, it is possible for a blocked +operation to be starved forever, even though other send and receive operations +keep chugging along. The proposal is to change the behavior so that +the thread waking up other thread completes the full protocol with it, entirely +side-stepping the queue. + +Dmitry Vyukov [points out](https://github.com/golang/go/issues/11506#issuecomment-118007672) +that the issue is not as clear-cut. Changing the behavior would basically +trade performance (throughput) for fairness (latency). + +In the end, the behavior [was changed](https://go-review.googlesource.com/c/go/+/16740) +several months later so Go channels now have fairer behavior. Quoting the pull request: + +> This change removes the retry mechanism we use for buffered channels. +> Instead, any sender waking up a receiver or vice versa completes the +> full protocol with its counterpart. This means the counterpart does +> not need to relock the channel when it wakes up. (Currently +> buffered channels need to relock on wakeup.) + +## Benchmarks + +The following benchmarks are not by any means extensive, but they should at least +give us *some* idea of how `crossbeam-channel` fares against other channels and queues. As always, +benchmarks are best served with a grain of salt. + +There are 7 different tests: + +* `seq`: A single thread sends `N` messages. Then it receives `N` messages. +* `spsc`: One thread sends `N` messages. Another thread receives `N` messages. +* `spsc`: One thread sends `N` messages. Another thread receives `N` messages. +* `mpsc`: `T` threads send `N / T` messages each. One thread receives `N` messages. +* `mpmc`: `T` threads send `N / T` messages each. `T` other threads receive `N / T` messages each. +* `select_rx`: `T` threads send `N / T` messages each into a separate channel. Another thread receives `N` messages by selecting over the `T` channels. +* `select_both`: `T` threads send `N / T` messages each by selecting over `T` channels. `T` other threads receive `N / T` messages each by selecting over the `T` channels. + +Some of those tests are not applicable to some channels or queues. For example, the `mpmc` +test doesn't apply to `std::sync::mpsc`. Similarly, `seq` doesn't apply to zero-capacity +channels because they cannot contain `N` messages. + +Constants: + +* `N = 5_000_000` +* `T = 4` + +All benchmark sources and additional scripts can be found +[here](https://github.com/stjepang/channel/tree/master/benchmarks). + +Results: + +![Graphs](https://github.com/stjepang/channel/raw/master/benchmarks/plot.png) + +`crossbeam-channel` is in almost all benchmarks either the fastest or +close to the fastest channel. However, there is one interesting case where it loses +to `std::sync::mpsc`: it is the `spsc` benchmark with unbounded channels. + +[**@JLockerman**](https://github.com/JLockerman) has recently +[improved](https://github.com/rust-lang/rust/pull/44963) the performance +of `std::sync::mpsc` in SPSC mode, which now makes it quite a bit faster than +`crossbeam-channel`. + +`std::sync::mpsc` switches dynamically from SPSC flavor to MPSC flavor when +the first `Sender` is cloned. `crossbeam-channel` has `Sender`s and `Receiver`s +that implement `Sync`, so it cannot switch between flavors dynamically in a +similar manner. + +### Integration into Servo + +I've managed to successfully switch Servo from `std::sync::mpsc` +to `crossbeam-channel` and run it. The code can be found +[here](https://github.com/servo/servo/compare/master...stjepang:crossbeam-channel). +Please note that it is quite messy and written just for the sake of the experiment. + +On [**@SimonSapin**](https://github.com/SimonSapin)'s +recommendation, I ran [RoboHornet](http://www.robohornet.org/) +benchmark to see what's the impact of `crossbeam-channel` on performance. There +weren't any tangible differences in the benchmark results. + +# Drawbacks + +1. The unbounded variant allocates nodes in blocks. A full block will be + allocated every time, even if just a few messages are sent through the channel. + +2. Selection interface is brittle - there are several rules that must be upheld + by the user of the API. Go's `select` is much more robust in this regard, but + that is because it has first-class support in the language. + +3. The unbounded variant is slower than `std::sync::mpsc` in SPSC scenarios. + +4. Compilation is slow. Even if you just write `unbounded().0.send(())`, three + `send` implementations will be compiled, one for each flavor. This is because + each call to `send` matches over flavors at runtime. Similarly, even if you use + the bounded variant only, `crossbeam-epoch` will be pulled in as a dependency. + * Case in point: + the [CSP example](https://github.com/stjepang/channel/blob/master/examples/csp.rs) + uses one bounded channel of capacity 1 (array-based flavor) and builds in 2.36 seconds + with optimizations. However, if all method bodies in list-based and zero-capacity + flavors are replaced with `unimplemented!()`, then the program builds in 1.65 + seconds. + +5. There is no *oneshot* optimization like in `std::sync::mpsc`. + +6. One `AtomicUsize` is allocated per slot in the bounded case (array-based flavor), + thus increasing memory consumption. + +7. Selection is still not perfectly fair. + +# Alternatives + +1. Use HP-based GC instead of epoch-based GC for the list-based flavor. + +2. Instead of just one universal `Sender`/`Receiver` pair, there could be more types + for different, better-optimized kinds of channels. For example, we could have + `unbounded::Sender`/`unbounded::Receiver`, + or `spsc::Sender`/`spsc::Receiver`, + or maybe even `oneshot::Sender`/`oneshot::Receiver`. + +3. Instead of matching over flavors at runtime, we could use dynamic dispatch to + call the right method. While this would decrease code size and improve compilation + time, benchmarks show negative effects on performance. + +4. We could implement a `select!` macro, even though it'd surely come with small annoyances. + The majority of cases where selection is needed are static lists of receive + operations, so perhaps a macro would be useful anyway? + +5. Have just a single struct `Channel` without the `Sender`/`Receiver` separation. + Such a channel would have to be closed explicitly by calling `.close()` on it. + +### Bikeshedding + +1. Instead of two global constructors `unbounded` and `bounded`, we could have just one + constructor named `new` that takes capacity as `Option`. Passing `Some(n)` would + create a bounded channel of capacity `n`, and passing `None` would create an + unbounded channel. + +# Unresolved questions + +### Basic queues + +While `crossbeam-channel` is essentially a beefed-up queue, it'd still be useful +to have slimmer, more specialized, and faster queues. Perhaps we could +have separate crates like: + +- `crossbeam-spsc` +- `crossbeam-spmc` +- `crossbeam-mpsc` +- `crossbeam-mpmc` + +Another very interesting crate that is in the works is +[**@jeehoonkang**](https://github.com/jeehoonkang)'s +[wait-free queue](https://github.com/jeehoonkang/crossbeam-queue). + +We'll have to think how to consolidate a bunch of different specialized +queues into a sensible set of Crossbeam crates. + +### Broadcast channel + +Another commonly requested data structure is a broadcast channel. +This is a special kind of channel that works with cloneable types only (`T: Clone`). +Each of the `Receiver`s receives a copy of *all* messages that were sent into the channel. +A popular broadcast channel crate is [`bus`](https://docs.rs/bus/1.3.2/bus/). + +Unfortunately, it's not possible to fit a broadcast flavor into +`crossbeam-channel`'s interface because of the `T: Clone` bound. + +### Improving `std::sync::mpsc` + +Since `crossbeam-channel` is such a big improvement over `std::sync::mpsc`, +perhaps some parts could be ported over into the standard library. + +For example, the poor performance of `sync_channel` really stands out. Perhaps it could be +rewritten using Dmitry Vyukov's bounded MPMC queue? Earlier this year I opened an +[issue](https://github.com/rust-lang/rust/issues/41021) suggesting this idea. + +Another possibility is to take inspiration from `crossbeam-channel`'s selection +mechanism to make [`Select`](https://doc.rust-lang.org/nightly/std/sync/mpsc/struct.Select.html) +safe.