diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index 6c3ad8a6ef961..ee25d19e324cc 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -159,16 +159,19 @@ //! //! # Using a scheduler pool //! +//! This library adds a `GreenTaskBuilder` trait that extends the methods +//! available on `std::task::TaskBuilder` to allow spawning a green task, +//! possibly pinned to a particular scheduler thread: +//! //! ```rust -//! use std::rt::task::TaskOpts; -//! use green::{SchedPool, PoolConfig}; -//! use green::sched::{PinnedTask, TaskFromFriend}; +//! use std::task::TaskBuilder; +//! use green::{SchedPool, PoolConfig, GreenTaskBuilder}; //! //! let config = PoolConfig::new(); //! let mut pool = SchedPool::new(config); //! //! // Spawn tasks into the pool of schedulers -//! pool.spawn(TaskOpts::new(), proc() { +//! TaskBuilder::new().green(&mut pool).spawn(proc() { //! // this code is running inside the pool of schedulers //! //! spawn(proc() { @@ -181,12 +184,9 @@ //! let mut handle = pool.spawn_sched(); //! //! // Pin a task to the spawned scheduler -//! let task = pool.task(TaskOpts::new(), proc() { /* ... */ }); -//! handle.send(PinnedTask(task)); -//! -//! // Schedule a task on this new scheduler -//! let task = pool.task(TaskOpts::new(), proc() { /* ... */ }); -//! handle.send(TaskFromFriend(task)); +//! TaskBuilder::new().green_pinned(&mut pool, &mut handle).spawn(proc() { +//! /* ... */ +//! }); //! //! // Handles keep schedulers alive, so be sure to drop all handles before //! // destroying the sched pool @@ -209,6 +209,8 @@ // NB this does *not* include globs, please keep it that way. #![feature(macro_rules, phase)] #![allow(visible_private_types)] +#![allow(deprecated)] +#![feature(default_type_params)] #[cfg(test)] #[phase(plugin, link)] extern crate log; #[cfg(test)] extern crate rustuv; @@ -224,8 +226,9 @@ use std::rt::task::TaskOpts; use std::rt; use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT}; use std::sync::deque; +use std::task::{TaskBuilder, Spawner}; -use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor}; +use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, PinnedTask, NewNeighbor}; use sleeper_list::SleeperList; use stack::StackPool; use task::GreenTask; @@ -444,6 +447,7 @@ impl SchedPool { /// This is useful to create a task which can then be sent to a specific /// scheduler created by `spawn_sched` (and possibly pin it to that /// scheduler). + #[deprecated = "use the green and green_pinned methods of GreenTaskBuilder instead"] pub fn task(&mut self, opts: TaskOpts, f: proc():Send) -> Box { GreenTask::configure(&mut self.stack_pool, opts, f) } @@ -454,6 +458,7 @@ impl SchedPool { /// New tasks are spawned in a round-robin fashion to the schedulers in this /// pool, but tasks can certainly migrate among schedulers once they're in /// the pool. + #[deprecated = "use the green and green_pinned methods of GreenTaskBuilder instead"] pub fn spawn(&mut self, opts: TaskOpts, f: proc():Send) { let task = self.task(opts, f); @@ -563,3 +568,54 @@ impl Drop for SchedPool { } } } + +/// A spawner for green tasks +pub struct GreenSpawner<'a>{ + pool: &'a mut SchedPool, + handle: Option<&'a mut SchedHandle> +} + +impl<'a> Spawner for GreenSpawner<'a> { + #[inline] + fn spawn(self, opts: TaskOpts, f: proc():Send) { + let GreenSpawner { pool, handle } = self; + match handle { + None => pool.spawn(opts, f), + Some(h) => h.send(PinnedTask(pool.task(opts, f))) + } + } +} + +/// An extension trait adding `green` configuration methods to `TaskBuilder`. +pub trait GreenTaskBuilder { + fn green<'a>(self, &'a mut SchedPool) -> TaskBuilder>; + fn green_pinned<'a>(self, &'a mut SchedPool, &'a mut SchedHandle) + -> TaskBuilder>; +} + +impl GreenTaskBuilder for TaskBuilder { + fn green<'a>(self, pool: &'a mut SchedPool) -> TaskBuilder> { + self.spawner(GreenSpawner {pool: pool, handle: None}) + } + + fn green_pinned<'a>(self, pool: &'a mut SchedPool, handle: &'a mut SchedHandle) + -> TaskBuilder> { + self.spawner(GreenSpawner {pool: pool, handle: Some(handle)}) + } +} + +#[cfg(test)] +mod test { + use std::task::TaskBuilder; + use super::{SchedPool, PoolConfig, GreenTaskBuilder}; + + #[test] + fn test_green_builder() { + let mut pool = SchedPool::new(PoolConfig::new()); + let res = TaskBuilder::new().green(&mut pool).try(proc() { + "Success!".to_string() + }); + assert_eq!(res.ok().unwrap(), "Success!".to_string()); + pool.shutdown(); + } +} diff --git a/src/libnative/lib.rs b/src/libnative/lib.rs index f04dfac80ccfe..40b99c5bbdb1d 100644 --- a/src/libnative/lib.rs +++ b/src/libnative/lib.rs @@ -32,10 +32,13 @@ //! ```rust //! extern crate native; //! +//! use std::task::TaskBuilder; +//! use native::NativeTaskBuilder; +//! //! fn main() { //! // We're not sure whether this main function is run in 1:1 or M:N mode. //! -//! native::task::spawn(proc() { +//! TaskBuilder::new().native().spawn(proc() { //! // this code is guaranteed to be run on a native thread //! }); //! } @@ -50,7 +53,8 @@ html_root_url = "http://doc.rust-lang.org/")] #![deny(unused_result, unused_must_use)] #![allow(non_camel_case_types)] -#![feature(macro_rules)] +#![allow(deprecated)] +#![feature(default_type_params)] // NB this crate explicitly does *not* allow glob imports, please seriously // consider whether they're needed before adding that feature here (the @@ -65,6 +69,8 @@ use std::os; use std::rt; use std::str; +pub use task::NativeTaskBuilder; + pub mod io; pub mod task; diff --git a/src/libnative/task.rs b/src/libnative/task.rs index b073c2c7fbf02..88e581a479136 100644 --- a/src/libnative/task.rs +++ b/src/libnative/task.rs @@ -27,6 +27,7 @@ use std::rt; use io; use task; +use std::task::{TaskBuilder, Spawner}; /// Creates a new Task which is ready to execute as a 1:1 task. pub fn new(stack_bounds: (uint, uint)) -> Box { @@ -48,12 +49,14 @@ fn ops() -> Box { } /// Spawns a function with the default configuration +#[deprecated = "use the native method of NativeTaskBuilder instead"] pub fn spawn(f: proc():Send) { spawn_opts(TaskOpts { name: None, stack_size: None, on_exit: None }, f) } /// Spawns a new task given the configuration options and a procedure to run /// inside the task. +#[deprecated = "use the native method of NativeTaskBuilder instead"] pub fn spawn_opts(opts: TaskOpts, f: proc():Send) { let TaskOpts { name, stack_size, on_exit } = opts; @@ -95,6 +98,26 @@ pub fn spawn_opts(opts: TaskOpts, f: proc():Send) { }) } +/// A spawner for native tasks +pub struct NativeSpawner; + +impl Spawner for NativeSpawner { + fn spawn(self, opts: TaskOpts, f: proc():Send) { + spawn_opts(opts, f) + } +} + +/// An extension trait adding a `native` configuration method to `TaskBuilder`. +pub trait NativeTaskBuilder { + fn native(self) -> TaskBuilder; +} + +impl NativeTaskBuilder for TaskBuilder { + fn native(self) -> TaskBuilder { + self.spawner(NativeSpawner) + } +} + // This structure is the glue between channels and the 1:1 scheduling mode. This // structure is allocated once per task. struct Ops { @@ -259,7 +282,8 @@ mod tests { use std::rt::local::Local; use std::rt::task::{Task, TaskOpts}; use std::task; - use super::{spawn, spawn_opts, Ops}; + use std::task::TaskBuilder; + use super::{spawn, spawn_opts, Ops, NativeTaskBuilder}; #[test] fn smoke() { @@ -347,4 +371,12 @@ mod tests { }); rx.recv(); } + + #[test] + fn test_native_builder() { + let res = TaskBuilder::new().native().try(proc() { + "Success!".to_string() + }); + assert_eq!(res.ok().unwrap(), "Success!".to_string()); + } } diff --git a/src/librustc/driver/mod.rs b/src/librustc/driver/mod.rs index ada65394e1923..f55fd78762c5d 100644 --- a/src/librustc/driver/mod.rs +++ b/src/librustc/driver/mod.rs @@ -366,22 +366,19 @@ fn monitor(f: proc():Send) { #[cfg(not(rtopt))] static STACK_SIZE: uint = 20000000; // 20MB - let mut task_builder = TaskBuilder::new().named("rustc"); + let (tx, rx) = channel(); + let w = io::ChanWriter::new(tx); + let mut r = io::ChanReader::new(rx); + + let mut task = TaskBuilder::new().named("rustc").stderr(box w); // FIXME: Hacks on hacks. If the env is trying to override the stack size // then *don't* set it explicitly. if os::getenv("RUST_MIN_STACK").is_none() { - task_builder.opts.stack_size = Some(STACK_SIZE); + task = task.stack_size(STACK_SIZE); } - let (tx, rx) = channel(); - let w = io::ChanWriter::new(tx); - let mut r = io::ChanReader::new(rx); - - match task_builder.try(proc() { - io::stdio::set_stderr(box w); - f() - }) { + match task.try(f) { Ok(()) => { /* fallthrough */ } Err(value) => { // Task failed without emitting a fatal diagnostic diff --git a/src/libstd/task.rs b/src/libstd/task.rs index f543188af4295..0ead8fa6c0c83 100644 --- a/src/libstd/task.rs +++ b/src/libstd/task.rs @@ -8,71 +8,110 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -/*! - * Utilities for managing and scheduling tasks - * - * An executing Rust program consists of a collection of tasks, each with their - * own stack, and sole ownership of their allocated heap data. Tasks communicate - * with each other using channels (see `std::comm` for more info about how - * communication works). - * - * Failure in one task does not propagate to any others (not to parent, not to - * child). Failure propagation is instead handled by using the channel send() - * and recv() methods which will fail if the other end has hung up already. - * - * Task Scheduling: - * - * By default, every task is created with the same "flavor" as the calling task. - * This flavor refers to the scheduling mode, with two possibilities currently - * being 1:1 and M:N modes. Green (M:N) tasks are cooperatively scheduled and - * native (1:1) tasks are scheduled by the OS kernel. - * - * # Example - * - * ```rust - * spawn(proc() { - * println!("Hello, World!"); - * }) - * ``` - */ +//! Utilities for managing and scheduling tasks +//! +//! An executing Rust program consists of a collection of lightweight tasks, +//! each with their own stack. Tasks communicate with each other using channels +//! (see `std::comm`) or other forms of synchronization (see `std::sync`) that +//! ensure data-race freedom. +//! +//! Failure in one task does immediately propagate to any others (not to parent, +//! not to child). Failure propagation is instead handled as part of task +//! synchronization. For example, the channel `send()` and `recv()` methods will +//! fail if the other end has hung up already. +//! +//! # Basic task scheduling +//! +//! By default, every task is created with the same "flavor" as the calling task. +//! This flavor refers to the scheduling mode, with two possibilities currently +//! being 1:1 and M:N modes. Green (M:N) tasks are cooperatively scheduled and +//! native (1:1) tasks are scheduled by the OS kernel. +//! +//! ## Example +//! +//! ```rust +//! spawn(proc() { +//! println!("Hello, World!"); +//! }) +//! ``` +//! +//! # Advanced task scheduling +//! +//! Task spawning can also be configured to use a particular scheduler, to +//! redirect the new task's output, or to yield a `future` representing the +//! task's final result. The configuration is established using the +//! `TaskBuilder` API: +//! +//! ## Example +//! +//! ```rust +//! extern crate green; +//! extern crate native; +//! +//! use std::task::TaskBuilder; +//! use green::{SchedPool, PoolConfig, GreenTaskBuilder}; +//! use native::NativeTaskBuilder; +//! +//! # fn main() { +//! // Create a green scheduler pool with the default configuration +//! let mut pool = SchedPool::new(PoolConfig::new()); +//! +//! // Spawn a task in the green pool +//! let mut fut_green = TaskBuilder::new().green(&mut pool).try_future(proc() { +//! /* ... */ +//! }); +//! +//! // Spawn a native task +//! let mut fut_native = TaskBuilder::new().native().try_future(proc() { +//! /* ... */ +//! }); +//! +//! // Wait for both tasks to finish, recording their outcome +//! let res_green = fut_green.unwrap(); +//! let res_native = fut_native.unwrap(); +//! +//! // Shut down the green scheduler pool +//! pool.shutdown(); +//! # } +//! ``` use any::Any; -use comm::{Sender, Receiver, channel}; +use comm::channel; use io::{Writer, stdio}; use kinds::{Send, marker}; use option::{None, Some, Option}; use owned::Box; -use result::{Result, Ok, Err}; +use result::Result; use rt::local::Local; use rt::task; use rt::task::Task; use str::{Str, SendStr, IntoMaybeOwned}; +use sync::Future; -#[cfg(test)] use any::AnyRefExt; -#[cfg(test)] use owned::AnyOwnExt; -#[cfg(test)] use result; -#[cfg(test)] use str::StrAllocating; -#[cfg(test)] use string::String; - -/// Task configuration options -pub struct TaskOpts { - /// Enable lifecycle notifications on the given channel - pub notify_chan: Option>, - /// A name for the task-to-be, for identification in failure messages - pub name: Option, - /// The size of the stack for the spawned task - pub stack_size: Option, - /// Task-local stdout - pub stdout: Option>, - /// Task-local stderr - pub stderr: Option>, +/// A means of spawning a task +pub trait Spawner { + /// Spawn a task, given low-level task options. + fn spawn(self, opts: task::TaskOpts, f: proc():Send); } -/** - * The task builder type. - * - * Provides detailed control over the properties and behavior of new tasks. - */ +/// The default task spawner, which spawns siblings to the current task. +pub struct SiblingSpawner; + +impl Spawner for SiblingSpawner { + fn spawn(self, opts: task::TaskOpts, f: proc():Send) { + // bind tb to provide type annotation + let tb: Option> = Local::try_take(); + match tb { + Some(t) => t.spawn_sibling(opts, f), + None => fail!("need a local task to spawn a sibling task"), + }; + } +} + +/// The task builder type. +/// +/// Provides detailed control over the properties and behavior of new tasks. + // NB: Builders are designed to be single-use because they do stateful // things that get weird when reusing - e.g. if you create a result future // it only applies to a single task, so then you have to maintain Some @@ -80,75 +119,102 @@ pub struct TaskOpts { // when you try to reuse the builder to spawn a new task. We'll just // sidestep that whole issue by making builders uncopyable and making // the run function move them in. -pub struct TaskBuilder { - /// Options to spawn the new task with - pub opts: TaskOpts, - gen_body: Option proc(): Send>, +pub struct TaskBuilder { + // A name for the task-to-be, for identification in failure messages + name: Option, + // The size of the stack for the spawned task + stack_size: Option, + // Task-local stdout + stdout: Option>, + // Task-local stderr + stderr: Option>, + // The mechanics of actually spawning the task (i.e.: green or native) + spawner: S, + // Optionally wrap the eventual task body + gen_body: Option proc():Send>, nocopy: marker::NoCopy, } -impl TaskBuilder { - /// Generate the base configuration for spawning a task, off of which more - /// configuration methods can be chained. - pub fn new() -> TaskBuilder { +impl TaskBuilder { + /// Generate the base configuration for spawning a task, off of which more + /// configuration methods can be chained. + pub fn new() -> TaskBuilder { TaskBuilder { - opts: TaskOpts::new(), + name: None, + stack_size: None, + stdout: None, + stderr: None, + spawner: SiblingSpawner, gen_body: None, nocopy: marker::NoCopy, } } +} - /// Get a future representing the exit status of the task. - /// - /// Taking the value of the future will block until the child task - /// terminates. The future result return value will be created *before* the task is - /// spawned; as such, do not invoke .get() on it directly; - /// rather, store it in an outer variable/list for later use. - /// - /// # Failure - /// Fails if a future_result was already set for this task. - pub fn future_result(&mut self) -> Receiver { - // FIXME (#3725): Once linked failure and notification are - // handled in the library, I can imagine implementing this by just - // registering an arbitrary number of task::on_exit handlers and - // sending out messages. - - if self.opts.notify_chan.is_some() { - fail!("Can't set multiple future_results for one task!"); - } - - // Construct the future and give it to the caller. - let (tx, rx) = channel(); +impl TaskBuilder { + /// Name the task-to-be. Currently the name is used for identification + /// only in failure messages. + pub fn named>(mut self, name: T) -> TaskBuilder { + self.name = Some(name.into_maybe_owned()); + self + } - // Reconfigure self to use a notify channel. - self.opts.notify_chan = Some(tx); + /// Set the size of the stack for the new task. + pub fn stack_size(mut self, size: uint) -> TaskBuilder { + self.stack_size = Some(size); + self + } - rx + /// Redirect task-local stdout. + pub fn stdout(mut self, stdout: Box) -> TaskBuilder { + self.stdout = Some(stdout); + self } - /// Name the task-to-be. Currently the name is used for identification - /// only in failure messages. - pub fn named>(mut self, name: S) -> TaskBuilder { - self.opts.name = Some(name.into_maybe_owned()); + /// Redirect task-local stderr. + pub fn stderr(mut self, stderr: Box) -> TaskBuilder { + self.stderr = Some(stderr); self } - /** - * Add a wrapper to the body of the spawned task. - * - * Before the task is spawned it is passed through a 'body generator' - * function that may perform local setup operations as well as wrap - * the task body in remote setup operations. With this the behavior - * of tasks can be extended in simple ways. - * - * This function augments the current body generator with a new body - * generator by applying the task body which results from the - * existing body generator to the new body generator. - */ - pub fn with_wrapper(mut self, - wrapper: proc(v: proc(): Send): Send -> proc(): Send) - -> TaskBuilder - { + /// Set the spawning mechanism for the task. + /// + /// The `TaskBuilder` API configures a task to be spawned, but defers to the + /// "spawner" to actually create and spawn the task. The `spawner` method + /// should not be called directly by `TaskBuiler` clients. It is intended + /// for use by downstream crates (like `native` and `green`) that implement + /// tasks. These downstream crates then add extension methods to the + /// builder, like `.native()` and `.green(pool)`, that actually set the + /// spawner. + pub fn spawner(self, spawner: T) -> TaskBuilder { + // repackage the entire TaskBuilder since its type is changing. + let TaskBuilder { + name, stack_size, stdout, stderr, spawner: _, gen_body, nocopy + } = self; + TaskBuilder { + name: name, + stack_size: stack_size, + stdout: stdout, + stderr: stderr, + spawner: spawner, + gen_body: gen_body, + nocopy: nocopy, + } + } + + /// Add a wrapper to the body of the spawned task. + /// + /// Before the task is spawned it is passed through a 'body generator' + /// function that may perform local setup operations as well as wrap + /// the task body in remote setup operations. With this the behavior + /// of tasks can be extended in simple ways. + /// + /// This function augments the current body generator with a new body + /// generator by applying the task body which results from the + /// existing body generator to the new body generator. + #[deprecated = "this function will be removed soon"] + pub fn with_wrapper(mut self, wrapper: proc(v: proc():Send):Send -> proc():Send) + -> TaskBuilder { self.gen_body = match self.gen_body.take() { Some(prev) => Some(proc(body) { wrapper(prev(body)) }), None => Some(wrapper) @@ -156,90 +222,80 @@ impl TaskBuilder { self } - /** - * Creates and executes a new child task - * - * Sets up a new task with its own call stack and schedules it to run - * the provided unique closure. The task has the properties and behavior - * specified by the task_builder. - */ - pub fn spawn(mut self, f: proc(): Send) { - let gen_body = self.gen_body.take(); - let f = match gen_body { + // Where spawning actually happens (whether yielding a future or not) + fn spawn_internal(self, f: proc():Send, + on_exit: Option>):Send>) { + let TaskBuilder { + name, stack_size, stdout, stderr, spawner, mut gen_body, nocopy: _ + } = self; + let f = match gen_body.take() { Some(gen) => gen(f), None => f }; - let t: Box = match Local::try_take() { - Some(t) => t, - None => fail!("need a local task to spawn a new task"), - }; - let TaskOpts { notify_chan, name, stack_size, stdout, stderr } = self.opts; - let opts = task::TaskOpts { - on_exit: notify_chan.map(|c| proc(r) c.send(r)), + on_exit: on_exit, name: name, stack_size: stack_size, }; if stdout.is_some() || stderr.is_some() { - t.spawn_sibling(opts, proc() { + spawner.spawn(opts, proc() { let _ = stdout.map(stdio::set_stdout); let _ = stderr.map(stdio::set_stderr); f(); - }); + }) } else { - t.spawn_sibling(opts, f); + spawner.spawn(opts, f) } } - /** - * Execute a function in another task and return either the return value - * of the function or result::err. - * - * # Return value - * - * If the function executed successfully then try returns result::ok - * containing the value returned by the function. If the function fails - * then try returns result::err containing nil. - * - * # Failure - * Fails if a future_result was already set for this task. - */ - pub fn try(mut self, f: proc(): Send -> T) - -> Result> { - let (tx, rx) = channel(); - - let result = self.future_result(); - - self.spawn(proc() { - tx.send(f()); - }); - - match result.recv() { - Ok(()) => Ok(rx.recv()), - Err(cause) => Err(cause) - } + /// Creates and executes a new child task. + /// + /// Sets up a new task with its own call stack and schedules it to run + /// the provided proc. The task has the properties and behavior + /// specified by the `TaskBuilder`. + pub fn spawn(self, f: proc():Send) { + self.spawn_internal(f, None) } -} - -/* Task construction */ -impl TaskOpts { - pub fn new() -> TaskOpts { - /*! - * The default task options - */ + /// Execute a proc in a newly-spawned task and return a future representing + /// the task's result. The task has the properties and behavior + /// specified by the `TaskBuilder`. + /// + /// Taking the value of the future will block until the child task + /// terminates. + /// + /// # Return value + /// + /// If the child task executes successfully (without failing) then the + /// future returns `result::Ok` containing the value returned by the + /// function. If the child task fails then the future returns `result::Err` + /// containing the argument to `fail!(...)` as an `Any` trait object. + pub fn try_future(self, f: proc():Send -> T) + -> Future>> { + // currently, the on_exit proc provided by librustrt only works for unit + // results, so we use an additional side-channel to communicate the + // result. + + let (tx_done, rx_done) = channel(); // signal that task has exited + let (tx_retv, rx_retv) = channel(); // return value from task + + let on_exit = proc(res) { tx_done.send(res) }; + self.spawn_internal(proc() { tx_retv.send(f()) }, + Some(on_exit)); + + Future::from_fn(proc() { + rx_done.recv().map(|_| rx_retv.recv()) + }) + } - TaskOpts { - notify_chan: None, - name: None, - stack_size: None, - stdout: None, - stderr: None, - } + /// Execute a function in a newly-spawnedtask and block until the task + /// completes or fails. Equivalent to `.try_future(f).unwrap()`. + pub fn try(self, f: proc():Send -> T) -> Result> { + self.try_future(f).unwrap() } } -/* Spawn convenience functions */ +/* Convenience functions */ /// Creates and executes a new child task /// @@ -251,14 +307,22 @@ pub fn spawn(f: proc(): Send) { TaskBuilder::new().spawn(f) } -/// Execute a function in another task and return either the return value of -/// the function or an error if the task failed +/// Execute a function in a newly-spawned task and return either the return +/// value of the function or an error if the task failed. /// -/// This is equivalent to TaskBuilder::new().try +/// This is equivalent to `TaskBuilder::new().try`. pub fn try(f: proc(): Send -> T) -> Result> { TaskBuilder::new().try(f) } +/// Execute a function in another task and return a future representing the +/// task's result. +/// +/// This is equivalent to `TaskBuilder::new().try_future`. +pub fn try_future(f: proc():Send -> T) -> Future>> { + TaskBuilder::new().try_future(f) +} + /* Lifecycle functions */ @@ -273,9 +337,8 @@ pub fn with_task_name(blk: |Option<&str>| -> U) -> U { } } +/// Yield control to the task scheduler. pub fn deschedule() { - //! Yield control to the task scheduler - use rt::local::Local; // FIXME(#7544): Optimize this, since we know we won't block. @@ -283,266 +346,282 @@ pub fn deschedule() { task.yield_now(); } +/// True if the running task is currently failing (e.g. will return `true` inside a +/// destructor that is run while unwinding the stack after a call to `fail!()`). pub fn failing() -> bool { - //! True if the running task has failed use rt::task::Task; Local::borrow(None::).unwinder.unwinding() } -// The following 8 tests test the following 2^3 combinations: -// {un,}linked {un,}supervised failure propagation {up,down}wards. - -// !!! These tests are dangerous. If Something is buggy, they will hang, !!! -// !!! instead of exiting cleanly. This might wedge the buildbots. !!! - -#[test] -fn test_unnamed_task() { - spawn(proc() { - with_task_name(|name| { - assert!(name.is_none()); +#[cfg(test)] +mod test { + use any::{Any, AnyRefExt}; + use owned::AnyOwnExt; + use result; + use result::{Ok, Err}; + use str::StrAllocating; + use string::String; + use std::io::{ChanReader, ChanWriter}; + use prelude::*; + use super::*; + + // !!! These tests are dangerous. If something is buggy, they will hang, !!! + // !!! instead of exiting cleanly. This might wedge the buildbots. !!! + + #[test] + fn test_unnamed_task() { + spawn(proc() { + with_task_name(|name| { + assert!(name.is_none()); + }) }) - }) -} + } -#[test] -fn test_owned_named_task() { - TaskBuilder::new().named("ada lovelace".to_string()).spawn(proc() { - with_task_name(|name| { - assert!(name.unwrap() == "ada lovelace"); + #[test] + fn test_owned_named_task() { + TaskBuilder::new().named("ada lovelace".to_string()).spawn(proc() { + with_task_name(|name| { + assert!(name.unwrap() == "ada lovelace"); + }) }) - }) -} + } -#[test] -fn test_static_named_task() { - TaskBuilder::new().named("ada lovelace").spawn(proc() { - with_task_name(|name| { - assert!(name.unwrap() == "ada lovelace"); + #[test] + fn test_static_named_task() { + TaskBuilder::new().named("ada lovelace").spawn(proc() { + with_task_name(|name| { + assert!(name.unwrap() == "ada lovelace"); + }) }) - }) -} + } -#[test] -fn test_send_named_task() { - TaskBuilder::new().named("ada lovelace".into_maybe_owned()).spawn(proc() { - with_task_name(|name| { - assert!(name.unwrap() == "ada lovelace"); + #[test] + fn test_send_named_task() { + TaskBuilder::new().named("ada lovelace".into_maybe_owned()).spawn(proc() { + with_task_name(|name| { + assert!(name.unwrap() == "ada lovelace"); + }) }) - }) -} - -#[test] -fn test_run_basic() { - let (tx, rx) = channel(); - TaskBuilder::new().spawn(proc() { - tx.send(()); - }); - rx.recv(); -} + } -#[test] -fn test_with_wrapper() { - let (tx, rx) = channel(); - TaskBuilder::new().with_wrapper(proc(body) { - let result: proc(): Send = proc() { - body(); + #[test] + fn test_run_basic() { + let (tx, rx) = channel(); + TaskBuilder::new().spawn(proc() { tx.send(()); - }; - result - }).spawn(proc() { }); - rx.recv(); -} + }); + rx.recv(); + } -#[test] -fn test_future_result() { - let mut builder = TaskBuilder::new(); - let result = builder.future_result(); - builder.spawn(proc() {}); - assert!(result.recv().is_ok()); - - let mut builder = TaskBuilder::new(); - let result = builder.future_result(); - builder.spawn(proc() { - fail!(); - }); - assert!(result.recv().is_err()); -} + #[test] + fn test_with_wrapper() { + let (tx, rx) = channel(); + TaskBuilder::new().with_wrapper(proc(body) { + let result: proc():Send = proc() { + body(); + tx.send(()); + }; + result + }).spawn(proc() { }); + rx.recv(); + } -#[test] #[should_fail] -fn test_back_to_the_future_result() { - let mut builder = TaskBuilder::new(); - builder.future_result(); - builder.future_result(); -} + #[test] + fn test_try_future() { + let result = TaskBuilder::new().try_future(proc() {}); + assert!(result.unwrap().is_ok()); -#[test] -fn test_try_success() { - match try(proc() { - "Success!".to_string() - }).as_ref().map(|s| s.as_slice()) { - result::Ok("Success!") => (), - _ => fail!() + let result = TaskBuilder::new().try_future(proc() -> () { + fail!(); + }); + assert!(result.unwrap().is_err()); } -} -#[test] -fn test_try_fail() { - match try(proc() { - fail!() - }) { - result::Err(_) => (), - result::Ok(()) => fail!() + #[test] + fn test_try_success() { + match try(proc() { + "Success!".to_string() + }).as_ref().map(|s| s.as_slice()) { + result::Ok("Success!") => (), + _ => fail!() + } } -} -#[test] -fn test_spawn_sched() { - use clone::Clone; + #[test] + fn test_try_fail() { + match try(proc() { + fail!() + }) { + result::Err(_) => (), + result::Ok(()) => fail!() + } + } - let (tx, rx) = channel(); + #[test] + fn test_spawn_sched() { + use clone::Clone; - fn f(i: int, tx: Sender<()>) { - let tx = tx.clone(); - spawn(proc() { - if i == 0 { - tx.send(()); - } else { - f(i - 1, tx); - } - }); + let (tx, rx) = channel(); + fn f(i: int, tx: Sender<()>) { + let tx = tx.clone(); + spawn(proc() { + if i == 0 { + tx.send(()); + } else { + f(i - 1, tx); + } + }); + + } + f(10, tx); + rx.recv(); } - f(10, tx); - rx.recv(); -} -#[test] -fn test_spawn_sched_childs_on_default_sched() { - let (tx, rx) = channel(); + #[test] + fn test_spawn_sched_childs_on_default_sched() { + let (tx, rx) = channel(); - spawn(proc() { spawn(proc() { - tx.send(()); + spawn(proc() { + tx.send(()); + }); }); - }); - rx.recv(); -} + rx.recv(); + } -#[cfg(test)] -fn avoid_copying_the_body(spawnfn: |v: proc(): Send|) { - let (tx, rx) = channel::(); + fn avoid_copying_the_body(spawnfn: |v: proc():Send|) { + let (tx, rx) = channel::(); - let x = box 1; - let x_in_parent = (&*x) as *int as uint; + let x = box 1; + let x_in_parent = (&*x) as *int as uint; - spawnfn(proc() { - let x_in_child = (&*x) as *int as uint; - tx.send(x_in_child); - }); + spawnfn(proc() { + let x_in_child = (&*x) as *int as uint; + tx.send(x_in_child); + }); - let x_in_child = rx.recv(); - assert_eq!(x_in_parent, x_in_child); -} + let x_in_child = rx.recv(); + assert_eq!(x_in_parent, x_in_child); + } -#[test] -fn test_avoid_copying_the_body_spawn() { - avoid_copying_the_body(spawn); -} + #[test] + fn test_avoid_copying_the_body_spawn() { + avoid_copying_the_body(spawn); + } -#[test] -fn test_avoid_copying_the_body_task_spawn() { - avoid_copying_the_body(|f| { - let builder = TaskBuilder::new(); - builder.spawn(proc() { - f(); - }); - }) -} + #[test] + fn test_avoid_copying_the_body_task_spawn() { + avoid_copying_the_body(|f| { + let builder = TaskBuilder::new(); + builder.spawn(proc() { + f(); + }); + }) + } -#[test] -fn test_avoid_copying_the_body_try() { - avoid_copying_the_body(|f| { - let _ = try(proc() { - f() - }); - }) -} + #[test] + fn test_avoid_copying_the_body_try() { + avoid_copying_the_body(|f| { + let _ = try(proc() { + f() + }); + }) + } -#[test] -fn test_child_doesnt_ref_parent() { - // If the child refcounts the parent task, this will stack overflow when - // climbing the task tree to dereference each ancestor. (See #1789) - // (well, it would if the constant were 8000+ - I lowered it to be more - // valgrind-friendly. try this at home, instead..!) - static generations: uint = 16; - fn child_no(x: uint) -> proc(): Send { - return proc() { - if x < generations { - TaskBuilder::new().spawn(child_no(x+1)); + #[test] + fn test_child_doesnt_ref_parent() { + // If the child refcounts the parent task, this will stack overflow when + // climbing the task tree to dereference each ancestor. (See #1789) + // (well, it would if the constant were 8000+ - I lowered it to be more + // valgrind-friendly. try this at home, instead..!) + static generations: uint = 16; + fn child_no(x: uint) -> proc(): Send { + return proc() { + if x < generations { + TaskBuilder::new().spawn(child_no(x+1)); + } } } + TaskBuilder::new().spawn(child_no(0)); } - TaskBuilder::new().spawn(child_no(0)); -} -#[test] -fn test_simple_newsched_spawn() { - spawn(proc()()) -} + #[test] + fn test_simple_newsched_spawn() { + spawn(proc()()) + } -#[test] -fn test_try_fail_message_static_str() { - match try(proc() { - fail!("static string"); - }) { - Err(e) => { - type T = &'static str; - assert!(e.is::()); - assert_eq!(*e.move::().unwrap(), "static string"); + #[test] + fn test_try_fail_message_static_str() { + match try(proc() { + fail!("static string"); + }) { + Err(e) => { + type T = &'static str; + assert!(e.is::()); + assert_eq!(*e.move::().unwrap(), "static string"); + } + Ok(()) => fail!() } - Ok(()) => fail!() } -} -#[test] -fn test_try_fail_message_owned_str() { - match try(proc() { - fail!("owned string".to_string()); - }) { - Err(e) => { - type T = String; - assert!(e.is::()); - assert_eq!(*e.move::().unwrap(), "owned string".to_string()); + #[test] + fn test_try_fail_message_owned_str() { + match try(proc() { + fail!("owned string".to_string()); + }) { + Err(e) => { + type T = String; + assert!(e.is::()); + assert_eq!(*e.move::().unwrap(), "owned string".to_string()); + } + Ok(()) => fail!() } - Ok(()) => fail!() } -} -#[test] -fn test_try_fail_message_any() { - match try(proc() { - fail!(box 413u16 as Box); - }) { - Err(e) => { - type T = Box; - assert!(e.is::()); - let any = e.move::().unwrap(); - assert!(any.is::()); - assert_eq!(*any.move::().unwrap(), 413u16); + #[test] + fn test_try_fail_message_any() { + match try(proc() { + fail!(box 413u16 as Box); + }) { + Err(e) => { + type T = Box; + assert!(e.is::()); + let any = e.move::().unwrap(); + assert!(any.is::()); + assert_eq!(*any.move::().unwrap(), 413u16); + } + Ok(()) => fail!() } - Ok(()) => fail!() } -} -#[test] -fn test_try_fail_message_unit_struct() { - struct Juju; + #[test] + fn test_try_fail_message_unit_struct() { + struct Juju; + + match try(proc() { + fail!(Juju) + }) { + Err(ref e) if e.is::() => {} + Err(_) | Ok(()) => fail!() + } + } + + #[test] + fn test_stdout() { + let (tx, rx) = channel(); + let mut reader = ChanReader::new(rx); + let stdout = ChanWriter::new(tx); + + TaskBuilder::new().stdout(box stdout as Box).try(proc() { + print!("Hello, world!"); + }).unwrap(); - match try(proc() { - fail!(Juju) - }) { - Err(ref e) if e.is::() => {} - Err(_) | Ok(()) => fail!() + let output = reader.read_to_str().unwrap(); + assert_eq!(output, "Hello, world!".to_string()); } + + // NOTE: the corresponding test for stderr is in run-pass/task-stderr, due + // to the test harness apparently interfering with stderr configuration. } diff --git a/src/libsync/lock.rs b/src/libsync/lock.rs index d7990068d5eb7..1fe8e8fc0db34 100644 --- a/src/libsync/lock.rs +++ b/src/libsync/lock.rs @@ -459,7 +459,7 @@ mod tests { use std::prelude::*; use std::comm::Empty; use std::task; - use std::task::TaskBuilder; + use std::task::try_future; use Arc; use super::{Mutex, Barrier, RWLock}; @@ -629,17 +629,15 @@ mod tests { let mut children = Vec::new(); for _ in range(0, 5) { let arc3 = arc.clone(); - let mut builder = TaskBuilder::new(); - children.push(builder.future_result()); - builder.spawn(proc() { + children.push(try_future(proc() { let lock = arc3.read(); assert!(*lock >= 0); - }); + })); } // Wait for children to pass their asserts for r in children.mut_iter() { - assert!(r.recv().is_ok()); + assert!(r.get_ref().is_ok()); } // Wait for writer to finish diff --git a/src/libtest/lib.rs b/src/libtest/lib.rs index e7c35fb59eee5..112ffd7c1a4a4 100644 --- a/src/libtest/lib.rs +++ b/src/libtest/lib.rs @@ -1049,14 +1049,13 @@ pub fn run_test(opts: &TestOpts, if nocapture { drop((stdout, stderr)); } else { - task.opts.stdout = Some(box stdout as Box); - task.opts.stderr = Some(box stderr as Box); + task = task.stdout(box stdout as Box); + task = task.stderr(box stderr as Box); } - let result_future = task.future_result(); - task.spawn(testfn); + let result_future = task.try_future(testfn); let stdout = reader.read_to_end().unwrap().move_iter().collect(); - let task_result = result_future.recv(); + let task_result = result_future.unwrap(); let test_result = calc_result(&desc, task_result.is_ok()); monitor_ch.send((desc.clone(), test_result, stdout)); }) diff --git a/src/test/bench/msgsend-pipes-shared.rs b/src/test/bench/msgsend-pipes-shared.rs index 03d91cf3aaa7a..14155d54d73cc 100644 --- a/src/test/bench/msgsend-pipes-shared.rs +++ b/src/test/bench/msgsend-pipes-shared.rs @@ -24,7 +24,6 @@ extern crate debug; use std::comm; use std::os; use std::task; -use std::task::TaskBuilder; use std::uint; fn move_out(_x: T) {} @@ -64,22 +63,20 @@ fn run(args: &[String]) { let mut worker_results = Vec::new(); for _ in range(0u, workers) { let to_child = to_child.clone(); - let mut builder = TaskBuilder::new(); - worker_results.push(builder.future_result()); - builder.spawn(proc() { + worker_results.push(task::try_future(proc() { for _ in range(0u, size / workers) { //println!("worker {:?}: sending {:?} bytes", i, num_bytes); to_child.send(bytes(num_bytes)); } //println!("worker {:?} exiting", i); - }); + })); } task::spawn(proc() { server(&from_parent, &to_parent); }); - for r in worker_results.iter() { - r.recv(); + for r in worker_results.move_iter() { + r.unwrap(); } //println!("sending stop message"); diff --git a/src/test/bench/msgsend-pipes.rs b/src/test/bench/msgsend-pipes.rs index 4de51c3ab4b8e..7ec2796b230a7 100644 --- a/src/test/bench/msgsend-pipes.rs +++ b/src/test/bench/msgsend-pipes.rs @@ -19,7 +19,6 @@ extern crate debug; use std::os; use std::task; -use std::task::TaskBuilder; use std::uint; fn move_out(_x: T) {} @@ -58,29 +57,25 @@ fn run(args: &[String]) { let mut worker_results = Vec::new(); let from_parent = if workers == 1 { let (to_child, from_parent) = channel(); - let mut builder = TaskBuilder::new(); - worker_results.push(builder.future_result()); - builder.spawn(proc() { + worker_results.push(task::try_future(proc() { for _ in range(0u, size / workers) { //println!("worker {:?}: sending {:?} bytes", i, num_bytes); to_child.send(bytes(num_bytes)); } //println!("worker {:?} exiting", i); - }); + })); from_parent } else { let (to_child, from_parent) = channel(); for _ in range(0u, workers) { let to_child = to_child.clone(); - let mut builder = TaskBuilder::new(); - worker_results.push(builder.future_result()); - builder.spawn(proc() { + worker_results.push(task::try_future(proc() { for _ in range(0u, size / workers) { //println!("worker {:?}: sending {:?} bytes", i, num_bytes); to_child.send(bytes(num_bytes)); } //println!("worker {:?} exiting", i); - }); + })); } from_parent }; @@ -88,8 +83,8 @@ fn run(args: &[String]) { server(&from_parent, &to_parent); }); - for r in worker_results.iter() { - r.recv(); + for r in worker_results.move_iter() { + r.unwrap(); } //println!("sending stop message"); diff --git a/src/test/bench/shootout-pfib.rs b/src/test/bench/shootout-pfib.rs index 33853a91b7608..57a6d0e7c523d 100644 --- a/src/test/bench/shootout-pfib.rs +++ b/src/test/bench/shootout-pfib.rs @@ -24,7 +24,6 @@ extern crate time; use std::os; use std::result::{Ok, Err}; use std::task; -use std::task::TaskBuilder; use std::uint; fn fib(n: int) -> int { @@ -79,14 +78,12 @@ fn stress_task(id: int) { fn stress(num_tasks: int) { let mut results = Vec::new(); for i in range(0, num_tasks) { - let mut builder = TaskBuilder::new(); - results.push(builder.future_result()); - builder.spawn(proc() { + results.push(task::try_future(proc() { stress_task(i); - }); + })); } - for r in results.iter() { - r.recv(); + for r in results.move_iter() { + r.unwrap(); } } diff --git a/src/test/run-pass/issue-2190-1.rs b/src/test/run-pass/issue-2190-1.rs index f1217be3484fe..4ff735708b5ab 100644 --- a/src/test/run-pass/issue-2190-1.rs +++ b/src/test/run-pass/issue-2190-1.rs @@ -13,9 +13,7 @@ use std::task::TaskBuilder; static generations: uint = 1024+256+128+49; fn spawn(f: proc():Send) { - let mut t = TaskBuilder::new(); - t.opts.stack_size = Some(32 * 1024); - t.spawn(f); + TaskBuilder::new().stack_size(32 * 1024).spawn(f) } fn child_no(x: uint) -> proc():Send { diff --git a/src/test/run-pass/task-comm-12.rs b/src/test/run-pass/task-comm-12.rs index a78bbefed44db..851f87adfc20d 100644 --- a/src/test/run-pass/task-comm-12.rs +++ b/src/test/run-pass/task-comm-12.rs @@ -9,7 +9,6 @@ // except according to those terms. use std::task; -use std::task::TaskBuilder; pub fn main() { test00(); } @@ -17,9 +16,7 @@ fn start(_task_number: int) { println!("Started / Finished task."); } fn test00() { let i: int = 0; - let mut builder = TaskBuilder::new(); - let mut result = builder.future_result(); - builder.spawn(proc() { + let mut result = task::try_future(proc() { start(i) }); @@ -31,7 +28,7 @@ fn test00() { } // Try joining tasks that have already finished. - result.recv(); + result.unwrap(); println!("Joined task."); } diff --git a/src/test/run-pass/task-comm-3.rs b/src/test/run-pass/task-comm-3.rs index 217bab5c1dea6..cd31d15db1092 100644 --- a/src/test/run-pass/task-comm-3.rs +++ b/src/test/run-pass/task-comm-3.rs @@ -10,7 +10,7 @@ extern crate debug; -use std::task::TaskBuilder; +use std::task; pub fn main() { println!("===== WITHOUT THREADS ====="); test00(); } @@ -39,14 +39,12 @@ fn test00() { let mut results = Vec::new(); while i < number_of_tasks { let tx = tx.clone(); - let mut builder = TaskBuilder::new(); - results.push(builder.future_result()); - builder.spawn({ + results.push(task::try_future({ let i = i; proc() { test00_start(&tx, i, number_of_messages) } - }); + })); i = i + 1; } @@ -62,7 +60,7 @@ fn test00() { } // Join spawned tasks... - for r in results.iter() { r.recv(); } + for r in results.mut_iter() { r.get_ref(); } println!("Completed: Final number is: "); println!("{:?}", sum); diff --git a/src/test/run-pass/task-comm-9.rs b/src/test/run-pass/task-comm-9.rs index ab6498c00279b..a46b4513c5dee 100644 --- a/src/test/run-pass/task-comm-9.rs +++ b/src/test/run-pass/task-comm-9.rs @@ -10,7 +10,7 @@ extern crate debug; -use std::task::TaskBuilder; +use std::task; pub fn main() { test00(); } @@ -25,9 +25,7 @@ fn test00() { let (tx, rx) = channel(); let number_of_messages: int = 10; - let mut builder = TaskBuilder::new(); - let result = builder.future_result(); - builder.spawn(proc() { + let result = task::try_future(proc() { test00_start(&tx, number_of_messages); }); @@ -38,7 +36,7 @@ fn test00() { i += 1; } - result.recv(); + result.unwrap(); assert_eq!(sum, number_of_messages * (number_of_messages - 1) / 2); } diff --git a/src/test/run-pass/task-stderr.rs b/src/test/run-pass/task-stderr.rs new file mode 100644 index 0000000000000..6a71f9df6e45e --- /dev/null +++ b/src/test/run-pass/task-stderr.rs @@ -0,0 +1,26 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::io::{ChanReader, ChanWriter}; +use std::task::TaskBuilder; + +fn main() { + let (tx, rx) = channel(); + let mut reader = ChanReader::new(rx); + let stderr = ChanWriter::new(tx); + + let res = TaskBuilder::new().stderr(box stderr as Box).try(proc() -> () { + fail!("Hello, world!") + }); + assert!(res.is_err()); + + let output = reader.read_to_str().unwrap(); + assert!(output.as_slice().contains("Hello, world!")); +} diff --git a/src/test/run-pass/tcp-stress.rs b/src/test/run-pass/tcp-stress.rs index 9c4716e5f4d38..99bf247229e05 100644 --- a/src/test/run-pass/tcp-stress.rs +++ b/src/test/run-pass/tcp-stress.rs @@ -60,9 +60,7 @@ fn main() { let (tx, rx) = channel(); for _ in range(0, 1000) { let tx = tx.clone(); - let mut builder = TaskBuilder::new(); - builder.opts.stack_size = Some(64 * 1024); - builder.spawn(proc() { + TaskBuilder::new().stack_size(64 * 1024).spawn(proc() { let host = addr.ip.to_str(); let port = addr.port; match TcpStream::connect(host.as_slice(), port) { diff --git a/src/test/run-pass/yield.rs b/src/test/run-pass/yield.rs index 2662a6c6568b9..89d204dcecbdb 100644 --- a/src/test/run-pass/yield.rs +++ b/src/test/run-pass/yield.rs @@ -9,18 +9,15 @@ // except according to those terms. use std::task; -use std::task::TaskBuilder; pub fn main() { - let mut builder = TaskBuilder::new(); - let mut result = builder.future_result(); - builder.spawn(child); + let mut result = task::try_future(child); println!("1"); task::deschedule(); println!("2"); task::deschedule(); println!("3"); - result.recv(); + result.unwrap(); } fn child() { diff --git a/src/test/run-pass/yield1.rs b/src/test/run-pass/yield1.rs index 8c5504725bb48..d882b1abd295d 100644 --- a/src/test/run-pass/yield1.rs +++ b/src/test/run-pass/yield1.rs @@ -9,15 +9,12 @@ // except according to those terms. use std::task; -use std::task::TaskBuilder; pub fn main() { - let mut builder = TaskBuilder::new(); - let mut result = builder.future_result(); - builder.spawn(child); + let mut result = task::try_future(child); println!("1"); task::deschedule(); - result.recv(); + result.unwrap(); } fn child() { println!("2"); }