diff --git a/mk/rt.mk b/mk/rt.mk index e43a68e5211de..26f35ea90dbe4 100644 --- a/mk/rt.mk +++ b/mk/rt.mk @@ -52,6 +52,7 @@ RUNTIME_CS_$(1) := \ rt/rust_uv.cpp \ rt/rust_uvtmp.cpp \ rt/rust_log.cpp \ + rt/rust_port_selector.cpp \ rt/circular_buffer.cpp \ rt/isaac/randport.cpp \ rt/rust_srv.cpp \ @@ -88,6 +89,7 @@ RUNTIME_HDR_$(1) := rt/globals.h \ rt/rust_stack.h \ rt/rust_task_list.h \ rt/rust_log.h \ + rt/rust_port_selector.h \ rt/circular_buffer.h \ rt/util/array_list.h \ rt/util/indexed_list.h \ diff --git a/src/libcore/comm.rs b/src/libcore/comm.rs index c7ef988a3da41..2ff1cc1d4b2f6 100644 --- a/src/libcore/comm.rs +++ b/src/libcore/comm.rs @@ -27,6 +27,7 @@ import task; export send; export recv; export peek; +export select2; export chan::{}; export port::{}; @@ -46,10 +47,14 @@ native mod rustrt { fn port_recv(dptr: *uint, po: *rust_port, yield: *ctypes::uintptr_t, killed: *ctypes::uintptr_t); + fn rust_port_select(dptr: **rust_port, ports: **rust_port, + n_ports: ctypes::size_t, + yield: *ctypes::uintptr_t); } #[abi = "rust-intrinsic"] native mod rusti { + // FIXME: This should probably not take a boxed closure fn call_with_retptr(&&f: fn@(*uint)) -> T; } @@ -154,6 +159,45 @@ fn recv_(p: *rust_port) -> T { ret res; } +#[doc = "Receive on one of two ports"] +fn select2( + p_a: port, p_b: port +) -> either::t unsafe { + + fn select(dptr: **rust_port, ports: **rust_port, + n_ports: ctypes::size_t, yield: *ctypes::uintptr_t) { + rustrt::rust_port_select(dptr, ports, n_ports, yield) + } + + let ports = []; + ports += [***p_a, ***p_b]; + let n_ports = 2 as ctypes::size_t; + let yield = 0u; + let yieldp = ptr::addr_of(yield); + + let resport: *rust_port = vec::as_buf(ports) {|ports| + rusti::call_with_retptr {|retptr| + select(unsafe::reinterpret_cast(retptr), ports, n_ports, yieldp) + } + }; + + if yield != 0u { + // Wait for data + task::yield(); + } + + // Now we know the port we're supposed to receive from + assert resport != ptr::null(); + + if resport == ***p_a { + either::left(recv(p_a)) + } else if resport == ***p_b { + either::right(recv(p_b)) + } else { + fail "unexpected result from rust_port_select"; + } +} + #[doc = "Returns true if there are messages available"] fn peek(p: port) -> bool { rustrt::rust_port_size(***p) != 0u as ctypes::size_t @@ -218,4 +262,80 @@ fn test_peek() { assert peek(po); recv(po); assert !peek(po); -} \ No newline at end of file +} + +#[test] +fn test_select2_available() { + let po_a = port(); + let po_b = port(); + let ch_a = chan(po_a); + let ch_b = chan(po_b); + + send(ch_a, "a"); + + assert select2(po_a, po_b) == either::left("a"); + + send(ch_b, "b"); + + assert select2(po_a, po_b) == either::right("b"); +} + +#[test] +fn test_select2_rendezvous() { + let po_a = port(); + let po_b = port(); + let ch_a = chan(po_a); + let ch_b = chan(po_b); + + iter::repeat(10u) {|| + task::spawn {|| + iter::repeat(10u) {|| task::yield() } + send(ch_a, "a"); + }; + + assert select2(po_a, po_b) == either::left("a"); + + task::spawn {|| + iter::repeat(10u) {|| task::yield() } + send(ch_b, "b"); + }; + + assert select2(po_a, po_b) == either::right("b"); + } +} + +#[test] +fn test_select2_stress() { + let po_a = port(); + let po_b = port(); + let ch_a = chan(po_a); + let ch_b = chan(po_b); + + let msgs = 100u; + let times = 4u; + + iter::repeat(times) {|| + task::spawn {|| + iter::repeat(msgs) {|| + send(ch_a, "a") + } + }; + task::spawn {|| + iter::repeat(msgs) {|| + send(ch_b, "b") + } + }; + } + + let as = 0; + let bs = 0; + iter::repeat(msgs * times * 2u) {|| + alt select2(po_a, po_b) { + either::left("a") { as += 1 } + either::right("b") { bs += 1 } + } + } + + assert as == 400; + assert bs == 400; +} diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 628389f10ceef..ecc73204f29c8 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -593,6 +593,14 @@ port_recv(uintptr_t *dptr, rust_port *port, return; } +extern "C" CDECL void +rust_port_select(rust_port **dptr, rust_port **ports, + size_t n_ports, uintptr_t *yield) { + rust_task *task = rust_task_thread::get_task(); + rust_port_selector *selector = task->get_port_selector(); + selector->select(task, dptr, ports, n_ports, yield); +} + extern "C" CDECL void rust_set_exit_status(intptr_t code) { rust_task *task = rust_task_thread::get_task(); diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp index a917c12e151a3..5f46b9c4ca0a5 100644 --- a/src/rt/rust_port.cpp +++ b/src/rt/rust_port.cpp @@ -30,18 +30,34 @@ void rust_port::detach() { void rust_port::send(void *sptr) { I(task->thread, !lock.lock_held_by_current_thread()); - scoped_lock with(lock); + bool did_rendezvous = false; + { + scoped_lock with(lock); + + buffer.enqueue(sptr); - buffer.enqueue(sptr); + A(kernel, !buffer.is_empty(), + "rust_chan::transmit with nothing to send."); + + if (task->blocked_on(this)) { + KLOG(kernel, comm, "dequeued in rendezvous_ptr"); + buffer.dequeue(task->rendezvous_ptr); + task->rendezvous_ptr = 0; + task->wakeup(this); + did_rendezvous = true; + } + } - A(kernel, !buffer.is_empty(), - "rust_chan::transmit with nothing to send."); + if (!did_rendezvous) { + // If the task wasn't waiting specifically on this port, + // it may be waiting on a group of ports - if (task->blocked_on(this)) { - KLOG(kernel, comm, "dequeued in rendezvous_ptr"); - buffer.dequeue(task->rendezvous_ptr); - task->rendezvous_ptr = 0; - task->wakeup(this); + rust_port_selector *port_selector = task->get_port_selector(); + // This check is not definitive. The port selector will take a lock + // and check again whether the task is still blocked. + if (task->blocked_on(port_selector)) { + port_selector->msg_sent_on(this); + } } } diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h index 44bd686650dc8..92ece8a7841e3 100644 --- a/src/rt/rust_port.h +++ b/src/rt/rust_port.h @@ -1,6 +1,8 @@ #ifndef RUST_PORT_H #define RUST_PORT_H +#include "rust_internal.h" + class rust_port : public kernel_owned, public rust_cond { public: RUST_REFCOUNTED(rust_port) diff --git a/src/rt/rust_port_selector.cpp b/src/rt/rust_port_selector.cpp new file mode 100644 index 0000000000000..610db6ce9277f --- /dev/null +++ b/src/rt/rust_port_selector.cpp @@ -0,0 +1,87 @@ +#include "rust_port.h" +#include "rust_port_selector.h" + +rust_port_selector::rust_port_selector() + : ports(NULL), n_ports(0) { +} + +void +rust_port_selector::select(rust_task *task, rust_port **dptr, + rust_port **ports, + size_t n_ports, uintptr_t *yield) { + + I(task->thread, this->ports == NULL); + I(task->thread, this->n_ports == 0); + I(task->thread, dptr != NULL); + I(task->thread, ports != NULL); + I(task->thread, n_ports != 0); + I(task->thread, yield != NULL); + + *yield = false; + size_t locks_taken = 0; + bool found_msg = false; + + // Take each port's lock as we iterate through them because + // if none of them contain a usable message then we need to + // block the task before any of them can try to send another + // message. + + // Start looking for ports from a different index each time. + size_t j = isaac_rand(&task->thread->rctx); + for (size_t i = 0; i < n_ports; i++) { + size_t k = (i + j) % n_ports; + rust_port *port = ports[k]; + I(task->thread, port != NULL); + + port->lock.lock(); + locks_taken++; + + if (port->buffer.size() > 0) { + *dptr = port; + found_msg = true; + break; + } + } + + if (!found_msg) { + this->ports = ports; + this->n_ports = n_ports; + I(task->thread, task->rendezvous_ptr == NULL); + task->rendezvous_ptr = (uintptr_t*)dptr; + *yield = true; + task->block(this, "waiting for select rendezvous"); + } + + for (size_t i = 0; i < locks_taken; i++) { + size_t k = (i + j) % n_ports; + rust_port *port = ports[k]; + port->lock.unlock(); + } +} + +void +rust_port_selector::msg_sent_on(rust_port *port) { + rust_task *task = port->task; + + I(task->thread, !task->lock.lock_held_by_current_thread()); + I(task->thread, !port->lock.lock_held_by_current_thread()); + I(task->thread, !rendezvous_lock.lock_held_by_current_thread()); + + // Prevent two ports from trying to wake up the task + // simultaneously + scoped_lock with(rendezvous_lock); + + if (task->blocked_on(this)) { + for (size_t i = 0; i < n_ports; i++) { + if (port == ports[i]) { + // This was one of the ports we were waiting on + ports = NULL; + n_ports = 0; + *task->rendezvous_ptr = (uintptr_t) port; + task->rendezvous_ptr = NULL; + task->wakeup(this); + return; + } + } + } +} diff --git a/src/rt/rust_port_selector.h b/src/rt/rust_port_selector.h new file mode 100644 index 0000000000000..8b4d902a2493b --- /dev/null +++ b/src/rt/rust_port_selector.h @@ -0,0 +1,27 @@ +#ifndef RUST_PORT_SELECTOR_H +#define RUST_PORT_SELECTOR_H + +#include "rust_internal.h" + +struct rust_task; +class rust_port; + +class rust_port_selector : public rust_cond { + private: + rust_port **ports; + size_t n_ports; + lock_and_signal rendezvous_lock; + + public: + rust_port_selector(); + + void select(rust_task *task, + rust_port **dptr, + rust_port **ports, + size_t n_ports, + uintptr_t *yield); + + void msg_sent_on(rust_port *port); +}; + +#endif /* RUST_PORT_SELECTOR_H */ diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index 7594e677bb0dc..fe1b94d6ea54a 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -16,6 +16,7 @@ #include "rust_obstack.h" #include "boxed_region.h" #include "rust_stack.h" +#include "rust_port_selector.h" // Corresponds to the rust chan (currently _chan) type. struct chan_handle { @@ -116,6 +117,8 @@ rust_task : public kernel_owned, rust_cond uintptr_t next_c_sp; uintptr_t next_rust_sp; + rust_port_selector port_selector; + // Called when the atomic refcount reaches zero void delete_this(); @@ -206,6 +209,8 @@ rust_task : public kernel_owned, rust_cond void call_on_c_stack(void *args, void *fn_ptr); void call_on_rust_stack(void *args, void *fn_ptr); bool have_c_stack() { return c_stack != NULL; } + + rust_port_selector *get_port_selector() { return &port_selector; } }; // This stuff is on the stack-switching fast path diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index bcdb2079d9717..2030f3207067a 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -17,6 +17,7 @@ nano_time new_port new_task port_recv +rust_port_select rand_free rand_new rand_next