Skip to content

Rewrite channels to be internally upgrade-able #11578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 12, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/doc/guide-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ Instead we can use a `SharedChan`, a type that allows a single
~~~
# use std::task::spawn;

let (port, chan) = SharedChan::new();
let (port, chan) = Chan::new();

for init_val in range(0u, 3) {
// Create a new channel handle to distribute to the child task
Expand Down
14 changes: 7 additions & 7 deletions src/libextra/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ fn run_tests(opts: &TestOpts,
remaining.reverse();
let mut pending = 0;

let (p, ch) = SharedChan::new();
let (p, ch) = Chan::new();

while pending > 0 || !remaining.is_empty() {
while pending < concurrency && !remaining.is_empty() {
Expand Down Expand Up @@ -878,7 +878,7 @@ pub fn filter_tests(

pub fn run_test(force_ignore: bool,
test: TestDescAndFn,
monitor_ch: SharedChan<MonitorMsg>) {
monitor_ch: Chan<MonitorMsg>) {

let TestDescAndFn {desc, testfn} = test;

Expand All @@ -888,7 +888,7 @@ pub fn run_test(force_ignore: bool,
}

fn run_test_inner(desc: TestDesc,
monitor_ch: SharedChan<MonitorMsg>,
monitor_ch: Chan<MonitorMsg>,
testfn: proc()) {
spawn(proc() {
let mut task = task::task();
Expand Down Expand Up @@ -1260,7 +1260,7 @@ mod tests {
},
testfn: DynTestFn(proc() f()),
};
let (p, ch) = SharedChan::new();
let (p, ch) = Chan::new();
run_test(false, desc, ch);
let (_, res) = p.recv();
assert!(res != TrOk);
Expand All @@ -1277,7 +1277,7 @@ mod tests {
},
testfn: DynTestFn(proc() f()),
};
let (p, ch) = SharedChan::new();
let (p, ch) = Chan::new();
run_test(false, desc, ch);
let (_, res) = p.recv();
assert_eq!(res, TrIgnored);
Expand All @@ -1294,7 +1294,7 @@ mod tests {
},
testfn: DynTestFn(proc() f()),
};
let (p, ch) = SharedChan::new();
let (p, ch) = Chan::new();
run_test(false, desc, ch);
let (_, res) = p.recv();
assert_eq!(res, TrOk);
Expand All @@ -1311,7 +1311,7 @@ mod tests {
},
testfn: DynTestFn(proc() f()),
};
let (p, ch) = SharedChan::new();
let (p, ch) = Chan::new();
run_test(false, desc, ch);
let (_, res) = p.recv();
assert_eq!(res, TrFailed);
Expand Down
5 changes: 3 additions & 2 deletions src/libgreen/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ use task::GreenTask;

mod macros;
mod simple;
mod message_queue;

pub mod basic;
pub mod context;
Expand Down Expand Up @@ -314,7 +315,7 @@ pub struct SchedPool {
#[deriving(Clone)]
struct TaskState {
cnt: UnsafeArc<AtomicUint>,
done: SharedChan<()>,
done: Chan<()>,
}

impl SchedPool {
Expand Down Expand Up @@ -468,7 +469,7 @@ impl SchedPool {

impl TaskState {
fn new() -> (Port<()>, TaskState) {
let (p, c) = SharedChan::new();
let (p, c) = Chan::new();
(p, TaskState {
cnt: UnsafeArc::new(AtomicUint::new(0)),
done: c,
Expand Down
61 changes: 61 additions & 0 deletions src/libgreen/message_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use mpsc = std::sync::mpsc_queue;
use std::sync::arc::UnsafeArc;

pub enum PopResult<T> {
Inconsistent,
Empty,
Data(T),
}

pub fn queue<T: Send>() -> (Consumer<T>, Producer<T>) {
let (a, b) = UnsafeArc::new2(mpsc::Queue::new());
(Consumer { inner: a }, Producer { inner: b })
}

pub struct Producer<T> {
priv inner: UnsafeArc<mpsc::Queue<T>>,
}

pub struct Consumer<T> {
priv inner: UnsafeArc<mpsc::Queue<T>>,
}

impl<T: Send> Consumer<T> {
pub fn pop(&mut self) -> PopResult<T> {
match unsafe { (*self.inner.get()).pop() } {
mpsc::Inconsistent => Inconsistent,
mpsc::Empty => Empty,
mpsc::Data(t) => Data(t),
}
}

pub fn casual_pop(&mut self) -> Option<T> {
match unsafe { (*self.inner.get()).pop() } {
mpsc::Inconsistent => None,
mpsc::Empty => None,
mpsc::Data(t) => Some(t),
}
}
}

impl<T: Send> Producer<T> {
pub fn push(&mut self, t: T) {
unsafe { (*self.inner.get()).push(t); }
}
}

impl<T: Send> Clone for Producer<T> {
fn clone(&self) -> Producer<T> {
Producer { inner: self.inner.clone() }
}
}
16 changes: 8 additions & 8 deletions src/libgreen/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ use std::rt::task::Task;
use std::sync::deque;
use std::unstable::mutex::Mutex;
use std::unstable::raw;
use mpsc = std::sync::mpsc_queue;

use TaskState;
use context::Context;
use coroutine::Coroutine;
use sleeper_list::SleeperList;
use stack::StackPool;
use task::{TypeSched, GreenTask, HomeSched, AnySched};
use msgq = message_queue;

/// A scheduler is responsible for coordinating the execution of Tasks
/// on a single thread. The scheduler runs inside a slightly modified
Expand All @@ -47,9 +47,9 @@ pub struct Scheduler {
/// The queue of incoming messages from other schedulers.
/// These are enqueued by SchedHandles after which a remote callback
/// is triggered to handle the message.
message_queue: mpsc::Consumer<SchedMessage, ()>,
message_queue: msgq::Consumer<SchedMessage>,
/// Producer used to clone sched handles from
message_producer: mpsc::Producer<SchedMessage, ()>,
message_producer: msgq::Producer<SchedMessage>,
/// A shared list of sleeping schedulers. We'll use this to wake
/// up schedulers when pushing work onto the work queue.
sleeper_list: SleeperList,
Expand Down Expand Up @@ -143,7 +143,7 @@ impl Scheduler {
state: TaskState)
-> Scheduler {

let (consumer, producer) = mpsc::queue(());
let (consumer, producer) = msgq::queue();
let mut sched = Scheduler {
pool_id: pool_id,
sleeper_list: sleeper_list,
Expand Down Expand Up @@ -215,7 +215,7 @@ impl Scheduler {

// Should not have any messages
let message = stask.sched.get_mut_ref().message_queue.pop();
rtassert!(match message { mpsc::Empty => true, _ => false });
rtassert!(match message { msgq::Empty => true, _ => false });

stask.task.get_mut_ref().destroyed = true;
}
Expand Down Expand Up @@ -340,8 +340,8 @@ impl Scheduler {
//
// I have chosen to take route #2.
match self.message_queue.pop() {
mpsc::Data(t) => Some(t),
mpsc::Empty | mpsc::Inconsistent => None
msgq::Data(t) => Some(t),
msgq::Empty | msgq::Inconsistent => None
}
};

Expand Down Expand Up @@ -849,7 +849,7 @@ pub enum SchedMessage {

pub struct SchedHandle {
priv remote: ~RemoteCallback,
priv queue: mpsc::Producer<SchedMessage, ()>,
priv queue: msgq::Producer<SchedMessage>,
sched_id: uint
}

Expand Down
3 changes: 1 addition & 2 deletions src/libnative/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
//! that you would find on the respective platform.

use std::c_str::CString;
use std::comm::SharedChan;
use std::io;
use std::io::IoError;
use std::io::net::ip::SocketAddr;
Expand Down Expand Up @@ -289,7 +288,7 @@ impl rtio::IoFactory for IoFactory {
})
}
}
fn signal(&mut self, _signal: Signum, _channel: SharedChan<Signum>)
fn signal(&mut self, _signal: Signum, _channel: Chan<Signum>)
-> IoResult<~RtioSignal> {
Err(unimpl())
}
Expand Down
10 changes: 6 additions & 4 deletions src/libnative/io/timer_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use task;
// only torn down after everything else has exited. This means that these
// variables are read-only during use (after initialization) and both of which
// are safe to use concurrently.
static mut HELPER_CHAN: *mut SharedChan<Req> = 0 as *mut SharedChan<Req>;
static mut HELPER_CHAN: *mut Chan<Req> = 0 as *mut Chan<Req>;
static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal;

pub fn boot(helper: fn(imp::signal, Port<Req>)) {
Expand All @@ -43,7 +43,9 @@ pub fn boot(helper: fn(imp::signal, Port<Req>)) {
unsafe {
LOCK.lock();
if !INITIALIZED {
let (msgp, msgc) = SharedChan::new();
let (msgp, msgc) = Chan::new();
// promote this to a shared channel
drop(msgc.clone());
HELPER_CHAN = cast::transmute(~msgc);
let (receive, send) = imp::new();
HELPER_SIGNAL = send;
Expand Down Expand Up @@ -84,8 +86,8 @@ fn shutdown() {
// Clean up after ther helper thread
unsafe {
imp::close(HELPER_SIGNAL);
let _chan: ~SharedChan<Req> = cast::transmute(HELPER_CHAN);
HELPER_CHAN = 0 as *mut SharedChan<Req>;
let _chan: ~Chan<Req> = cast::transmute(HELPER_CHAN);
HELPER_CHAN = 0 as *mut Chan<Req>;
HELPER_SIGNAL = 0 as imp::signal;
}
}
Expand Down
Loading