Skip to content

Rendezvous stream for synchronous channel messaging #8908

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 102 additions & 2 deletions src/libextra/comm.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
// Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
Expand Down Expand Up @@ -90,9 +90,55 @@ pub fn DuplexStream<T:Send,U:Send>()
})
}

/// An extension of `pipes::stream` that provides synchronous message sending.
pub struct SyncChan<T> { priv duplex_stream: DuplexStream<T, ()> }
/// An extension of `pipes::stream` that acknowledges each message received.
pub struct SyncPort<T> { priv duplex_stream: DuplexStream<(), T> }

impl<T: Send> GenericChan<T> for SyncChan<T> {
fn send(&self, val: T) {
assert!(self.try_send(val), "SyncChan.send: receiving port closed");
}
}

impl<T: Send> GenericSmartChan<T> for SyncChan<T> {
/// Sends a message, or report if the receiver has closed the connection before receiving.
fn try_send(&self, val: T) -> bool {
self.duplex_stream.try_send(val) && self.duplex_stream.try_recv().is_some()
}
}

impl<T: Send> GenericPort<T> for SyncPort<T> {
fn recv(&self) -> T {
self.try_recv().expect("SyncPort.recv: sending channel closed")
}

fn try_recv(&self) -> Option<T> {
do self.duplex_stream.try_recv().map_move |val| {
self.duplex_stream.try_send(());
val
}
}
}

impl<T: Send> Peekable<T> for SyncPort<T> {
fn peek(&self) -> bool {
self.duplex_stream.peek()
}
}

/// Creates a stream whose channel, upon sending a message, blocks until the message is received.
pub fn rendezvous<T: Send>() -> (SyncPort<T>, SyncChan<T>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rendezvous is nice, but it seems peculiar to mismatch the names of the types of the port/chan. Maybe it could be sync_stream?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommended the name rendezvous because it comes from a plan 9 synchronization primitive that does exactly the same thing. Plus (imo) it does suggest the functionality, as the sender and receiver are "rendezvousing" on the sent data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(it's also the name of the ada primitive which also does the same thing)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok; rendezvous is fine by me, I was just curious about the mismatch.

let (chan_stream, port_stream) = DuplexStream();
(SyncPort { duplex_stream: port_stream }, SyncChan { duplex_stream: chan_stream })
}

#[cfg(test)]
mod test {
use comm::DuplexStream;
use comm::{DuplexStream, rendezvous};
use std::rt::test::run_in_newsched_task;
use std::task::spawn_unlinked;


#[test]
pub fn DuplexStream1() {
Expand All @@ -104,4 +150,58 @@ mod test {
assert!(left.recv() == 123);
assert!(right.recv() == ~"abc");
}

#[test]
pub fn basic_rendezvous_test() {
let (port, chan) = rendezvous();

do spawn {
chan.send("abc");
}

assert!(port.recv() == "abc");
}

#[test]
fn recv_a_lot() {
// Rendezvous streams should be able to handle any number of messages being sent
do run_in_newsched_task {
let (port, chan) = rendezvous();
do spawn {
do 1000000.times { chan.send(()) }
}
do 1000000.times { port.recv() }
}
}

#[test]
fn send_and_fail_and_try_recv() {
let (port, chan) = rendezvous();
do spawn_unlinked {
chan.duplex_stream.send(()); // Can't access this field outside this module
fail!()
}
port.recv()
}

#[test]
fn try_send_and_recv_then_fail_before_ack() {
let (port, chan) = rendezvous();
do spawn_unlinked {
port.duplex_stream.recv();
fail!()
}
chan.try_send(());
}

#[test]
#[should_fail]
fn send_and_recv_then_fail_before_ack() {
let (port, chan) = rendezvous();
do spawn_unlinked {
port.duplex_stream.recv();
fail!()
}
chan.send(());
}
}