Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move futures to std, etc #3842

Merged
merged 7 commits into from
Oct 25, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/libcore/core.rc
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ pub mod task {
pub mod spawn;
pub mod rt;
}
pub mod future;
pub mod pipes;

// Runtime and language-primitive support
Expand Down
14 changes: 9 additions & 5 deletions src/libcore/private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,16 +581,20 @@ pub mod tests {

for uint::range(0, num_tasks) |_i| {
let total = total.clone();
futures.push(future::spawn(|move total| {
let (chan, port) = pipes::stream();
futures.push(move port);

do task::spawn |move total, move chan| {
for uint::range(0, count) |_i| {
do total.with |count| {
**count += 1;
}
}
}));
chan.send(());
}
};

for futures.each |f| { f.get() }
for futures.each |f| { f.recv() }

do total.with |total| {
assert **total == num_tasks * count
Expand Down Expand Up @@ -642,7 +646,7 @@ pub mod tests {
// Have to get rid of our reference before blocking.
{ let _x = move x; } // FIXME(#3161) util::ignore doesn't work here
let res = option::swap_unwrap(&mut res);
future::get(&res);
res.recv();
}

#[test] #[should_fail] #[ignore(cfg(windows))]
Expand All @@ -657,7 +661,7 @@ pub mod tests {
}
assert unwrap_exclusive(move x) == ~~"hello";
let res = option::swap_unwrap(&mut res);
future::get(&res);
res.recv();
}

#[test] #[ignore(cfg(windows))]
Expand Down
82 changes: 15 additions & 67 deletions src/libcore/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use cmp::Eq;
use result::Result;
use pipes::{stream, Chan, Port};
use local_data_priv::{local_get, local_set};
use util::replace;

use rt::task_id;
use rt::rust_task;
Expand Down Expand Up @@ -72,25 +73,6 @@ impl TaskResult : Eq {
pure fn ne(other: &TaskResult) -> bool { !self.eq(other) }
}

/// A message type for notifying of task lifecycle events
pub enum Notification {
/// Sent when a task exits with the task handle and result
Exit(Task, TaskResult)
}

impl Notification : cmp::Eq {
pure fn eq(other: &Notification) -> bool {
match self {
Exit(e0a, e1a) => {
match (*other) {
Exit(e0b, e1b) => e0a == e0b && e1a == e1b
}
}
}
}
pure fn ne(other: &Notification) -> bool { !self.eq(other) }
}

/// Scheduler modes
pub enum SchedMode {
/// All tasks run in the same OS thread
Expand Down Expand Up @@ -200,7 +182,7 @@ pub type SchedOpts = {
pub type TaskOpts = {
linked: bool,
supervised: bool,
mut notify_chan: Option<Chan<Notification>>,
mut notify_chan: Option<Chan<TaskResult>>,
sched: Option<SchedOpts>,
};

Expand Down Expand Up @@ -246,11 +228,7 @@ priv impl TaskBuilder {
fail ~"Cannot copy a task_builder"; // Fake move mode on self
}
self.consumed = true;
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: self.opts.linked,
Expand All @@ -271,11 +249,7 @@ impl TaskBuilder {
* the other will not be killed.
*/
fn unlinked() -> TaskBuilder {
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: false,
Expand All @@ -293,11 +267,7 @@ impl TaskBuilder {
* the child.
*/
fn supervised() -> TaskBuilder {
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: false,
Expand All @@ -314,11 +284,7 @@ impl TaskBuilder {
* other will be killed.
*/
fn linked() -> TaskBuilder {
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: true,
Expand Down Expand Up @@ -348,7 +314,7 @@ impl TaskBuilder {
* # Failure
* Fails if a future_result was already set for this task.
*/
fn future_result(blk: fn(v: future::Future<TaskResult>)) -> TaskBuilder {
fn future_result(blk: fn(v: Port<TaskResult>)) -> TaskBuilder {
// FIXME (#3725): Once linked failure and notification are
// handled in the library, I can imagine implementing this by just
// registering an arbitrary number of task::on_exit handlers and
Expand All @@ -359,13 +325,9 @@ impl TaskBuilder {
}

// Construct the future and give it to the caller.
let (notify_pipe_ch, notify_pipe_po) = stream::<Notification>();
let (notify_pipe_ch, notify_pipe_po) = stream::<TaskResult>();

blk(do future::from_fn |move notify_pipe_po| {
match notify_pipe_po.recv() {
Exit(_, result) => result
}
});
blk(move notify_pipe_po);

// Reconfigure self to use a notify channel.
TaskBuilder({
Expand All @@ -381,11 +343,7 @@ impl TaskBuilder {
}
/// Configure a custom scheduler mode for the task.
fn sched_mode(mode: SchedMode) -> TaskBuilder {
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: self.opts.linked,
Expand All @@ -412,11 +370,7 @@ impl TaskBuilder {
*/
fn add_wrapper(wrapper: fn@(v: fn~()) -> fn~()) -> TaskBuilder {
let prev_gen_body = self.gen_body;
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
TaskBuilder({
opts: {
linked: self.opts.linked,
Expand Down Expand Up @@ -447,13 +401,7 @@ impl TaskBuilder {
* must be greater than zero.
*/
fn spawn(f: fn~()) {
let notify_chan = if self.opts.notify_chan.is_none() {
None
} else {
let swapped_notify_chan =
option::swap_unwrap(&mut self.opts.notify_chan);
Some(move swapped_notify_chan)
};
let notify_chan = replace(&mut self.opts.notify_chan, None);
let x = self.consume();
let opts = {
linked: x.opts.linked,
Expand Down Expand Up @@ -532,7 +480,7 @@ impl TaskBuilder {
do fr_task_builder.spawn |move f| {
comm::send(ch, f());
}
match future::get(&option::unwrap(move result)) {
match option::unwrap(move result).recv() {
Success => result::Ok(comm::recv(po)),
Failure => result::Err(())
}
Expand Down Expand Up @@ -949,14 +897,14 @@ fn test_add_wrapper() {
fn test_future_result() {
let mut result = None;
do task().future_result(|+r| { result = Some(move r); }).spawn { }
assert future::get(&option::unwrap(move result)) == Success;
assert option::unwrap(move result).recv() == Success;

result = None;
do task().future_result(|+r|
{ result = Some(move r); }).unlinked().spawn {
fail;
}
assert future::get(&option::unwrap(move result)) == Failure;
assert option::unwrap(move result).recv() == Failure;
}

#[test] #[should_fail] #[ignore(cfg(windows))]
Expand Down
22 changes: 8 additions & 14 deletions src/libcore/task/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,15 +320,15 @@ fn TCB(me: *rust_task, tasks: TaskGroupArc, ancestors: AncestorList,
}

struct AutoNotify {
notify_chan: Chan<Notification>,
notify_chan: Chan<TaskResult>,
mut failed: bool,
drop {
let result = if self.failed { Failure } else { Success };
self.notify_chan.send(Exit(get_task(), result));
self.notify_chan.send(result);
}
}

fn AutoNotify(chan: Chan<Notification>) -> AutoNotify {
fn AutoNotify(chan: Chan<TaskResult>) -> AutoNotify {
AutoNotify {
notify_chan: move chan,
failed: true // Un-set above when taskgroup successfully made.
Expand Down Expand Up @@ -532,7 +532,7 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
// (4) ...and runs the provided body function.
fn make_child_wrapper(child: *rust_task, child_arc: TaskGroupArc,
ancestors: AncestorList, is_main: bool,
notify_chan: Option<Chan<Notification>>,
notify_chan: Option<Chan<TaskResult>>,
f: fn~()) -> fn~() {
let child_data = ~mut Some((move child_arc, move ancestors));
return fn~(move notify_chan, move child_data, move f) {
Expand Down Expand Up @@ -660,36 +660,30 @@ fn test_spawn_raw_unsupervise() {
#[test]
#[ignore(cfg(windows))]
fn test_spawn_raw_notify_success() {
let (task_ch, task_po) = pipes::stream();
let (notify_ch, notify_po) = pipes::stream();

let opts = {
notify_chan: Some(move notify_ch),
.. default_task_opts()
};
do spawn_raw(move opts) |move task_ch| {
task_ch.send(get_task());
do spawn_raw(move opts) {
}
let task_ = task_po.recv();
assert notify_po.recv() == Exit(task_, Success);
assert notify_po.recv() == Success;
}

#[test]
#[ignore(cfg(windows))]
fn test_spawn_raw_notify_failure() {
// New bindings for these
let (task_ch, task_po) = pipes::stream();
let (notify_ch, notify_po) = pipes::stream();

let opts = {
linked: false,
notify_chan: Some(move notify_ch),
.. default_task_opts()
};
do spawn_raw(move opts) |move task_ch| {
task_ch.send(get_task());
do spawn_raw(move opts) {
fail;
}
let task_ = task_po.recv();
assert notify_po.recv() == Exit(task_, Failure);
assert notify_po.recv() == Failure;
}
2 changes: 1 addition & 1 deletion src/libstd/arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ mod tests {
}

// Wait for children to pass their asserts
for vec::each(children) |r| { future::get(r); }
for vec::each(children) |r| { r.recv(); }

// Wait for writer to finish
p.recv();
Expand Down
28 changes: 14 additions & 14 deletions src/libcore/future.rs → src/libstd/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use either::Either;
use pipes::recv;
use pipes::{recv, oneshot, ChanOne, PortOne, send_one, recv_one};
use cast::copy_lifetime;

#[doc = "The future type"]
Expand Down Expand Up @@ -67,7 +67,7 @@ pub fn from_value<A>(val: A) -> Future<A> {
Future {state: Forced(~(move val))}
}

pub fn from_port<A:Send>(port: future_pipe::client::waiting<A>) ->
pub fn from_port<A:Send>(port: PortOne<A>) ->
Future<A> {
/*!
* Create a future from a port
Expand All @@ -82,7 +82,7 @@ pub fn from_port<A:Send>(port: future_pipe::client::waiting<A>) ->
port_ <-> *port;
let port = option::unwrap(move port_);
match recv(move port) {
future_pipe::completed(move data) => move data
oneshot::send(move data) => move data
}
}
}
Expand All @@ -107,9 +107,15 @@ pub fn spawn<A:Send>(blk: fn~() -> A) -> Future<A> {
* value of the future.
*/

from_port(pipes::spawn_service_recv(future_pipe::init, |move blk, ch| {
future_pipe::server::completed(move ch, blk());
}))
let (chan, port) = oneshot::init();

let chan = ~mut Some(move chan);
do task::spawn |move blk, move chan| {
let chan = option::swap_unwrap(&mut *chan);
send_one(move chan, blk());
}

return from_port(move port);
}

pub fn get_ref<A>(future: &r/Future<A>) -> &r/A {
Expand Down Expand Up @@ -162,12 +168,6 @@ pub fn with<A,B>(future: &Future<A>, blk: fn((&A)) -> B) -> B {
blk(get_ref(future))
}

proto! future_pipe (
waiting:recv<T:Send> {
completed(T) -> !
}
)

#[allow(non_implicitly_copyable_typarams)]
pub mod test {
#[test]
Expand All @@ -178,8 +178,8 @@ pub mod test {

#[test]
pub fn test_from_port() {
let (po, ch) = future_pipe::init();
future_pipe::server::completed(move ch, ~"whale");
let (ch, po) = oneshot::init();
send_one(move ch, ~"whale");
let f = from_port(move po);
assert get(&f) == ~"whale";
}
Expand Down
1 change: 1 addition & 0 deletions src/libstd/std.rc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub mod cell;
pub mod sync;
pub mod arc;
pub mod comm;
pub mod future;

// Collections

Expand Down
Loading