Skip to content

Commit 47c9a35

Browse files
committed
auto merge of #10830 : alexcrichton/rust/spsc-queue, r=brson
This pull request completely rewrites std::comm and all associated users. Some major bullet points * Everything now works natively * oneshots have been removed * shared ports have been removed * try_recv no longer blocks (recv_opt blocks) * constructors are now Chan::new and SharedChan::new * failure is propagated on send * stream channels are 3x faster I have acquired the following measurements on this patch. I compared against Go, but remember that Go's channels are fundamentally different than ours in that sends are by-default blocking. This means that it's not really a totally fair comparison, but it's good to see ballpark numbers for anyway ``` oneshot stream shared1 std 2.111 3.073 1.730 my 6.639 1.037 1.238 native 5.748 1.017 1.250 go8 1.774 3.575 2.948 go8-inf slow 0.837 1.376 go8-128 4.832 1.430 1.504 go1 1.528 1.439 1.251 go2 1.753 3.845 3.166 ``` I had three benchmarks: * oneshot - N times, create a "oneshot channel", send on it, then receive on it (no task spawning) * stream - N times, send from one task to another task, wait for both to complete * shared1 - create N threads, each of which sends M times, and a port receives N*M times. The rows are as follows: * `std` - the current libstd implementation (before this pull request) * `my` - this pull request's implementation (in M:N mode) * `native` - this pull request's implementation (in 1:1 mode) * `goN` - go's implementation with GOMAXPROCS=N. The only relevant value is 8 (I had 8 cores on this machine) * `goN-X` - go's implementation where the channels in question were created with buffers of size `X` to behave more similarly to rust's channels.
2 parents 2c41a82 + 39a6c9d commit 47c9a35

File tree

104 files changed

+3519
-3130
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

104 files changed

+3519
-3130
lines changed

doc/tutorial-tasks.md

+19-27
Original file line numberDiff line numberDiff line change
@@ -121,17 +121,16 @@ receiving messages. Pipes are low-level communication building-blocks and so
121121
come in a variety of forms, each one appropriate for a different use case. In
122122
what follows, we cover the most commonly used varieties.
123123

124-
The simplest way to create a pipe is to use the `comm::stream`
124+
The simplest way to create a pipe is to use `Chan::new`
125125
function to create a `(Port, Chan)` pair. In Rust parlance, a *channel*
126126
is a sending endpoint of a pipe, and a *port* is the receiving
127127
endpoint. Consider the following example of calculating two results
128128
concurrently:
129129

130130
~~~~
131131
# use std::task::spawn;
132-
# use std::comm::{stream, Port, Chan};
133132
134-
let (port, chan): (Port<int>, Chan<int>) = stream();
133+
let (port, chan): (Port<int>, Chan<int>) = Chan::new();
135134
136135
do spawn || {
137136
let result = some_expensive_computation();
@@ -150,8 +149,7 @@ stream for sending and receiving integers (the left-hand side of the `let`,
150149
a tuple into its component parts).
151150

152151
~~~~
153-
# use std::comm::{stream, Chan, Port};
154-
let (port, chan): (Port<int>, Chan<int>) = stream();
152+
let (port, chan): (Port<int>, Chan<int>) = Chan::new();
155153
~~~~
156154

157155
The child task will use the channel to send data to the parent task,
@@ -160,9 +158,8 @@ spawns the child task.
160158

161159
~~~~
162160
# use std::task::spawn;
163-
# use std::comm::stream;
164161
# fn some_expensive_computation() -> int { 42 }
165-
# let (port, chan) = stream();
162+
# let (port, chan) = Chan::new();
166163
do spawn || {
167164
let result = some_expensive_computation();
168165
chan.send(result);
@@ -180,25 +177,23 @@ computation, then waits for the child's result to arrive on the
180177
port:
181178

182179
~~~~
183-
# use std::comm::{stream};
184180
# fn some_other_expensive_computation() {}
185-
# let (port, chan) = stream::<int>();
181+
# let (port, chan) = Chan::<int>::new();
186182
# chan.send(0);
187183
some_other_expensive_computation();
188184
let result = port.recv();
189185
~~~~
190186

191-
The `Port` and `Chan` pair created by `stream` enables efficient communication
192-
between a single sender and a single receiver, but multiple senders cannot use
193-
a single `Chan`, and multiple receivers cannot use a single `Port`. What if our
194-
example needed to compute multiple results across a number of tasks? The
195-
following program is ill-typed:
187+
The `Port` and `Chan` pair created by `Chan::new` enables efficient
188+
communication between a single sender and a single receiver, but multiple
189+
senders cannot use a single `Chan`, and multiple receivers cannot use a single
190+
`Port`. What if our example needed to compute multiple results across a number
191+
of tasks? The following program is ill-typed:
196192

197193
~~~ {.xfail-test}
198194
# use std::task::{spawn};
199-
# use std::comm::{stream, Port, Chan};
200195
# fn some_expensive_computation() -> int { 42 }
201-
let (port, chan) = stream();
196+
let (port, chan) = Chan::new();
202197
203198
do spawn {
204199
chan.send(some_expensive_computation());
@@ -216,10 +211,8 @@ Instead we can use a `SharedChan`, a type that allows a single
216211

217212
~~~
218213
# use std::task::spawn;
219-
# use std::comm::{stream, SharedChan};
220214
221-
let (port, chan) = stream();
222-
let chan = SharedChan::new(chan);
215+
let (port, chan) = SharedChan::new();
223216
224217
for init_val in range(0u, 3) {
225218
// Create a new channel handle to distribute to the child task
@@ -238,23 +231,22 @@ Here we transfer ownership of the channel into a new `SharedChan` value. Like
238231
as an *affine* or *linear* type). Unlike with `Chan`, though, the programmer
239232
may duplicate a `SharedChan`, with the `clone()` method. A cloned
240233
`SharedChan` produces a new handle to the same channel, allowing multiple
241-
tasks to send data to a single port. Between `spawn`, `stream` and
234+
tasks to send data to a single port. Between `spawn`, `Chan` and
242235
`SharedChan`, we have enough tools to implement many useful concurrency
243236
patterns.
244237

245238
Note that the above `SharedChan` example is somewhat contrived since
246-
you could also simply use three `stream` pairs, but it serves to
239+
you could also simply use three `Chan` pairs, but it serves to
247240
illustrate the point. For reference, written with multiple streams, it
248241
might look like the example below.
249242

250243
~~~
251244
# use std::task::spawn;
252-
# use std::comm::stream;
253245
# use std::vec;
254246
255247
// Create a vector of ports, one for each child task
256248
let ports = vec::from_fn(3, |init_val| {
257-
let (port, chan) = stream();
249+
let (port, chan) = Chan::new();
258250
do spawn {
259251
chan.send(some_expensive_computation(init_val));
260252
}
@@ -341,7 +333,7 @@ fn main() {
341333
let numbers_arc = Arc::new(numbers);
342334
343335
for num in range(1u, 10) {
344-
let (port, chan) = stream();
336+
let (port, chan) = Chan::new();
345337
chan.send(numbers_arc.clone());
346338
347339
do spawn {
@@ -370,7 +362,7 @@ and a clone of it is sent to each task
370362
# use std::rand;
371363
# let numbers=vec::from_fn(1000000, |_| rand::random::<f64>());
372364
# let numbers_arc = Arc::new(numbers);
373-
# let (port, chan) = stream();
365+
# let (port, chan) = Chan::new();
374366
chan.send(numbers_arc.clone());
375367
~~~
376368
copying only the wrapper and not its contents.
@@ -382,7 +374,7 @@ Each task recovers the underlying data by
382374
# use std::rand;
383375
# let numbers=vec::from_fn(1000000, |_| rand::random::<f64>());
384376
# let numbers_arc=Arc::new(numbers);
385-
# let (port, chan) = stream();
377+
# let (port, chan) = Chan::new();
386378
# chan.send(numbers_arc.clone());
387379
# let local_arc : Arc<~[f64]> = port.recv();
388380
let task_numbers = local_arc.get();
@@ -499,7 +491,7 @@ Here is the code for the parent task:
499491
# }
500492
# fn main() {
501493
502-
let (from_child, to_child) = DuplexStream();
494+
let (from_child, to_child) = DuplexStream::new();
503495
504496
do spawn {
505497
stringifier(&to_child);

src/etc/licenseck.py

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
"rt/isaac/rand.h", # public domain
7878
"rt/isaac/standard.h", # public domain
7979
"libstd/rt/mpsc_queue.rs", # BSD
80+
"libstd/rt/spsc_queue.rs", # BSD
8081
"libstd/rt/mpmc_bounded_queue.rs", # BSD
8182
]
8283

src/libextra/arc.rs

+13-15
Original file line numberDiff line numberDiff line change
@@ -597,15 +597,14 @@ mod tests {
597597

598598
use arc::*;
599599

600-
use std::comm;
601600
use std::task;
602601

603602
#[test]
604603
fn manually_share_arc() {
605604
let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
606605
let arc_v = Arc::new(v);
607606

608-
let (p, c) = comm::stream();
607+
let (p, c) = Chan::new();
609608

610609
do task::spawn {
611610
let arc_v: Arc<~[int]> = p.recv();
@@ -626,7 +625,7 @@ mod tests {
626625
fn test_mutex_arc_condvar() {
627626
let arc = ~MutexArc::new(false);
628627
let arc2 = ~arc.clone();
629-
let (p,c) = comm::oneshot();
628+
let (p,c) = Chan::new();
630629
do task::spawn {
631630
// wait until parent gets in
632631
p.recv();
@@ -636,9 +635,8 @@ mod tests {
636635
})
637636
}
638637

639-
let mut c = Some(c);
640638
arc.access_cond(|state, cond| {
641-
c.take_unwrap().send(());
639+
c.send(());
642640
assert!(!*state);
643641
while !*state {
644642
cond.wait();
@@ -650,7 +648,7 @@ mod tests {
650648
fn test_arc_condvar_poison() {
651649
let arc = ~MutexArc::new(1);
652650
let arc2 = ~arc.clone();
653-
let (p, c) = comm::stream();
651+
let (p, c) = Chan::new();
654652

655653
do spawn {
656654
let _ = p.recv();
@@ -687,7 +685,7 @@ mod tests {
687685
pub fn test_mutex_arc_unwrap_poison() {
688686
let arc = MutexArc::new(1);
689687
let arc2 = ~(&arc).clone();
690-
let (p, c) = comm::stream();
688+
let (p, c) = Chan::new();
691689
do task::spawn {
692690
arc2.access(|one| {
693691
c.send(());
@@ -804,7 +802,7 @@ mod tests {
804802
fn test_rw_arc() {
805803
let arc = RWArc::new(0);
806804
let arc2 = arc.clone();
807-
let (p, c) = comm::stream();
805+
let (p, c) = Chan::new();
808806

809807
do task::spawn {
810808
arc2.write(|num| {
@@ -832,7 +830,7 @@ mod tests {
832830
});
833831

834832
// Wait for children to pass their asserts
835-
for r in children.iter() {
833+
for r in children.mut_iter() {
836834
r.recv();
837835
}
838836

@@ -855,7 +853,7 @@ mod tests {
855853
// Reader tasks
856854
let mut reader_convos = ~[];
857855
10.times(|| {
858-
let ((rp1, rc1), (rp2, rc2)) = (comm::stream(), comm::stream());
856+
let ((rp1, rc1), (rp2, rc2)) = (Chan::new(), Chan::new());
859857
reader_convos.push((rc1, rp2));
860858
let arcn = arc.clone();
861859
do task::spawn {
@@ -869,7 +867,7 @@ mod tests {
869867

870868
// Writer task
871869
let arc2 = arc.clone();
872-
let ((wp1, wc1), (wp2, wc2)) = (comm::stream(), comm::stream());
870+
let ((wp1, wc1), (wp2, wc2)) = (Chan::new(), Chan::new());
873871
do task::spawn || {
874872
wp1.recv();
875873
arc2.write_cond(|state, cond| {
@@ -897,14 +895,14 @@ mod tests {
897895
assert_eq!(*state, 42);
898896
*state = 31337;
899897
// send to other readers
900-
for &(ref rc, _) in reader_convos.iter() {
898+
for &(ref mut rc, _) in reader_convos.mut_iter() {
901899
rc.send(())
902900
}
903901
});
904902
let read_mode = arc.downgrade(write_mode);
905903
read_mode.read(|state| {
906904
// complete handshake with other readers
907-
for &(_, ref rp) in reader_convos.iter() {
905+
for &(_, ref mut rp) in reader_convos.mut_iter() {
908906
rp.recv()
909907
}
910908
wc1.send(()); // tell writer to try again
@@ -926,7 +924,7 @@ mod tests {
926924
// "blk(&Condvar { order: opt_lock, ..*cond })"
927925
// with just "blk(cond)".
928926
let x = RWArc::new(true);
929-
let (wp, wc) = comm::stream();
927+
let (wp, wc) = Chan::new();
930928

931929
// writer task
932930
let xw = x.clone();
@@ -951,7 +949,7 @@ mod tests {
951949
});
952950
// make a reader task to trigger the "reader cloud lock" handoff
953951
let xr = x.clone();
954-
let (rp, rc) = comm::stream();
952+
let (rp, rc) = Chan::new();
955953
do task::spawn {
956954
rc.send(());
957955
xr.read(|_state| { })

0 commit comments

Comments
 (0)