Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite std::comm #10830

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 19 additions & 27 deletions doc/tutorial-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,16 @@ receiving messages. Pipes are low-level communication building-blocks and so
come in a variety of forms, each one appropriate for a different use case. In
what follows, we cover the most commonly used varieties.

The simplest way to create a pipe is to use the `comm::stream`
The simplest way to create a pipe is to use `Chan::new`
function to create a `(Port, Chan)` pair. In Rust parlance, a *channel*
is a sending endpoint of a pipe, and a *port* is the receiving
endpoint. Consider the following example of calculating two results
concurrently:

~~~~
# use std::task::spawn;
# use std::comm::{stream, Port, Chan};

let (port, chan): (Port<int>, Chan<int>) = stream();
let (port, chan): (Port<int>, Chan<int>) = Chan::new();

do spawn || {
let result = some_expensive_computation();
Expand All @@ -150,8 +149,7 @@ stream for sending and receiving integers (the left-hand side of the `let`,
a tuple into its component parts).

~~~~
# use std::comm::{stream, Chan, Port};
let (port, chan): (Port<int>, Chan<int>) = stream();
let (port, chan): (Port<int>, Chan<int>) = Chan::new();
~~~~

The child task will use the channel to send data to the parent task,
Expand All @@ -160,9 +158,8 @@ spawns the child task.

~~~~
# use std::task::spawn;
# use std::comm::stream;
# fn some_expensive_computation() -> int { 42 }
# let (port, chan) = stream();
# let (port, chan) = Chan::new();
do spawn || {
let result = some_expensive_computation();
chan.send(result);
Expand All @@ -180,25 +177,23 @@ computation, then waits for the child's result to arrive on the
port:

~~~~
# use std::comm::{stream};
# fn some_other_expensive_computation() {}
# let (port, chan) = stream::<int>();
# let (port, chan) = Chan::<int>::new();
# chan.send(0);
some_other_expensive_computation();
let result = port.recv();
~~~~

The `Port` and `Chan` pair created by `stream` enables efficient communication
between a single sender and a single receiver, but multiple senders cannot use
a single `Chan`, and multiple receivers cannot use a single `Port`. What if our
example needed to compute multiple results across a number of tasks? The
following program is ill-typed:
The `Port` and `Chan` pair created by `Chan::new` enables efficient
communication between a single sender and a single receiver, but multiple
senders cannot use a single `Chan`, and multiple receivers cannot use a single
`Port`. What if our example needed to compute multiple results across a number
of tasks? The following program is ill-typed:

~~~ {.xfail-test}
# use std::task::{spawn};
# use std::comm::{stream, Port, Chan};
# fn some_expensive_computation() -> int { 42 }
let (port, chan) = stream();
let (port, chan) = Chan::new();

do spawn {
chan.send(some_expensive_computation());
Expand All @@ -216,10 +211,8 @@ Instead we can use a `SharedChan`, a type that allows a single

~~~
# use std::task::spawn;
# use std::comm::{stream, SharedChan};

let (port, chan) = stream();
let chan = SharedChan::new(chan);
let (port, chan) = SharedChan::new();

for init_val in range(0u, 3) {
// Create a new channel handle to distribute to the child task
Expand All @@ -238,23 +231,22 @@ Here we transfer ownership of the channel into a new `SharedChan` value. Like
as an *affine* or *linear* type). Unlike with `Chan`, though, the programmer
may duplicate a `SharedChan`, with the `clone()` method. A cloned
`SharedChan` produces a new handle to the same channel, allowing multiple
tasks to send data to a single port. Between `spawn`, `stream` and
tasks to send data to a single port. Between `spawn`, `Chan` and
`SharedChan`, we have enough tools to implement many useful concurrency
patterns.

Note that the above `SharedChan` example is somewhat contrived since
you could also simply use three `stream` pairs, but it serves to
you could also simply use three `Chan` pairs, but it serves to
illustrate the point. For reference, written with multiple streams, it
might look like the example below.

~~~
# use std::task::spawn;
# use std::comm::stream;
# use std::vec;

// Create a vector of ports, one for each child task
let ports = vec::from_fn(3, |init_val| {
let (port, chan) = stream();
let (port, chan) = Chan::new();
do spawn {
chan.send(some_expensive_computation(init_val));
}
Expand Down Expand Up @@ -341,7 +333,7 @@ fn main() {
let numbers_arc = Arc::new(numbers);

for num in range(1u, 10) {
let (port, chan) = stream();
let (port, chan) = Chan::new();
chan.send(numbers_arc.clone());

do spawn {
Expand Down Expand Up @@ -370,7 +362,7 @@ and a clone of it is sent to each task
# use std::rand;
# let numbers=vec::from_fn(1000000, |_| rand::random::<f64>());
# let numbers_arc = Arc::new(numbers);
# let (port, chan) = stream();
# let (port, chan) = Chan::new();
chan.send(numbers_arc.clone());
~~~
copying only the wrapper and not its contents.
Expand All @@ -382,7 +374,7 @@ Each task recovers the underlying data by
# use std::rand;
# let numbers=vec::from_fn(1000000, |_| rand::random::<f64>());
# let numbers_arc=Arc::new(numbers);
# let (port, chan) = stream();
# let (port, chan) = Chan::new();
# chan.send(numbers_arc.clone());
# let local_arc : Arc<~[f64]> = port.recv();
let task_numbers = local_arc.get();
Expand Down Expand Up @@ -499,7 +491,7 @@ Here is the code for the parent task:
# }
# fn main() {

let (from_child, to_child) = DuplexStream();
let (from_child, to_child) = DuplexStream::new();

do spawn {
stringifier(&to_child);
Expand Down
1 change: 1 addition & 0 deletions src/etc/licenseck.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
"rt/isaac/rand.h", # public domain
"rt/isaac/standard.h", # public domain
"libstd/rt/mpsc_queue.rs", # BSD
"libstd/rt/spsc_queue.rs", # BSD
"libstd/rt/mpmc_bounded_queue.rs", # BSD
]

Expand Down
28 changes: 13 additions & 15 deletions src/libextra/arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,15 +597,14 @@ mod tests {

use arc::*;

use std::comm;
use std::task;

#[test]
fn manually_share_arc() {
let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let arc_v = Arc::new(v);

let (p, c) = comm::stream();
let (p, c) = Chan::new();

do task::spawn {
let arc_v: Arc<~[int]> = p.recv();
Expand All @@ -626,7 +625,7 @@ mod tests {
fn test_mutex_arc_condvar() {
let arc = ~MutexArc::new(false);
let arc2 = ~arc.clone();
let (p,c) = comm::oneshot();
let (p,c) = Chan::new();
do task::spawn {
// wait until parent gets in
p.recv();
Expand All @@ -636,9 +635,8 @@ mod tests {
})
}

let mut c = Some(c);
arc.access_cond(|state, cond| {
c.take_unwrap().send(());
c.send(());
assert!(!*state);
while !*state {
cond.wait();
Expand All @@ -650,7 +648,7 @@ mod tests {
fn test_arc_condvar_poison() {
let arc = ~MutexArc::new(1);
let arc2 = ~arc.clone();
let (p, c) = comm::stream();
let (p, c) = Chan::new();

do spawn {
let _ = p.recv();
Expand Down Expand Up @@ -687,7 +685,7 @@ mod tests {
pub fn test_mutex_arc_unwrap_poison() {
let arc = MutexArc::new(1);
let arc2 = ~(&arc).clone();
let (p, c) = comm::stream();
let (p, c) = Chan::new();
do task::spawn {
arc2.access(|one| {
c.send(());
Expand Down Expand Up @@ -804,7 +802,7 @@ mod tests {
fn test_rw_arc() {
let arc = RWArc::new(0);
let arc2 = arc.clone();
let (p, c) = comm::stream();
let (p, c) = Chan::new();

do task::spawn {
arc2.write(|num| {
Expand Down Expand Up @@ -832,7 +830,7 @@ mod tests {
});

// Wait for children to pass their asserts
for r in children.iter() {
for r in children.mut_iter() {
r.recv();
}

Expand All @@ -855,7 +853,7 @@ mod tests {
// Reader tasks
let mut reader_convos = ~[];
10.times(|| {
let ((rp1, rc1), (rp2, rc2)) = (comm::stream(), comm::stream());
let ((rp1, rc1), (rp2, rc2)) = (Chan::new(), Chan::new());
reader_convos.push((rc1, rp2));
let arcn = arc.clone();
do task::spawn {
Expand All @@ -869,7 +867,7 @@ mod tests {

// Writer task
let arc2 = arc.clone();
let ((wp1, wc1), (wp2, wc2)) = (comm::stream(), comm::stream());
let ((wp1, wc1), (wp2, wc2)) = (Chan::new(), Chan::new());
do task::spawn || {
wp1.recv();
arc2.write_cond(|state, cond| {
Expand Down Expand Up @@ -897,14 +895,14 @@ mod tests {
assert_eq!(*state, 42);
*state = 31337;
// send to other readers
for &(ref rc, _) in reader_convos.iter() {
for &(ref mut rc, _) in reader_convos.mut_iter() {
rc.send(())
}
});
let read_mode = arc.downgrade(write_mode);
read_mode.read(|state| {
// complete handshake with other readers
for &(_, ref rp) in reader_convos.iter() {
for &(_, ref mut rp) in reader_convos.mut_iter() {
rp.recv()
}
wc1.send(()); // tell writer to try again
Expand All @@ -926,7 +924,7 @@ mod tests {
// "blk(&Condvar { order: opt_lock, ..*cond })"
// with just "blk(cond)".
let x = RWArc::new(true);
let (wp, wc) = comm::stream();
let (wp, wc) = Chan::new();

// writer task
let xw = x.clone();
Expand All @@ -951,7 +949,7 @@ mod tests {
});
// make a reader task to trigger the "reader cloud lock" handoff
let xr = x.clone();
let (rp, rc) = comm::stream();
let (rp, rc) = Chan::new();
do task::spawn {
rc.send(());
xr.read(|_state| { })
Expand Down
Loading