Skip to content

Commit

Permalink
std: Convert uv_global_loop to use pipes
Browse files Browse the repository at this point in the history
  • Loading branch information
brson committed Jan 22, 2013
1 parent 62a8859 commit df5338a
Show file tree
Hide file tree
Showing 14 changed files with 172 additions and 147 deletions.
15 changes: 15 additions & 0 deletions src/libcore/private/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,21 @@ unsafe fn global_data_modify_<T: Owned>(
}
}

pub unsafe fn global_data_clone<T: Owned Clone>(
key: GlobalDataKey<T>) -> Option<T> {
let mut maybe_clone: Option<T> = None;
do global_data_modify(key) |current| {
match &current {
&Some(~ref value) => {
maybe_clone = Some(value.clone());
}
&None => ()
}
current
}
return maybe_clone;
}

// GlobalState is a map from keys to unique pointers and a
// destructor. Keys are pointers derived from the type of the
// global value. There is a single GlobalState instance per runtime.
Expand Down
3 changes: 2 additions & 1 deletion src/libstd/flatpipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,14 +782,14 @@ mod test {
let (finish_port, finish_chan) = pipes::stream();

let addr = ip::v4::parse_addr("127.0.0.1");
let iotask = uv::global_loop::get();

let begin_connect_chan = Cell(move begin_connect_chan);
let accept_chan = Cell(move accept_chan);

// The server task
do task::spawn |copy addr, move begin_connect_chan,
move accept_chan| {
let iotask = &uv::global_loop::get();
let begin_connect_chan = begin_connect_chan.take();
let accept_chan = accept_chan.take();
let listen_res = do tcp::listen(
Expand Down Expand Up @@ -821,6 +821,7 @@ mod test {
begin_connect_port.recv();

debug!("connecting");
let iotask = &uv::global_loop::get();
let connect_result = tcp::connect(copy addr, port, iotask);
assert connect_result.is_ok();
let sock = result::unwrap(move connect_result);
Expand Down
6 changes: 3 additions & 3 deletions src/libstd/net_ip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ enum IpGetAddrErr {
* a vector of `ip_addr` results, in the case of success, or an error
* object in the case of failure
*/
pub fn get_addr(node: &str, iotask: iotask)
pub fn get_addr(node: &str, iotask: &iotask)
-> result::Result<~[IpAddr], IpGetAddrErr> {
do oldcomm::listen |output_ch| {
do str::as_buf(node) |node_ptr, len| unsafe {
Expand Down Expand Up @@ -413,7 +413,7 @@ mod test {
#[ignore(reason = "valgrind says it's leaky")]
fn test_ip_get_addr() {
let localhost_name = ~"localhost";
let iotask = uv::global_loop::get();
let iotask = &uv::global_loop::get();
let ga_result = get_addr(localhost_name, iotask);
if result::is_err(&ga_result) {
fail ~"got err result from net::ip::get_addr();"
Expand All @@ -439,7 +439,7 @@ mod test {
#[ignore(reason = "valgrind says it's leaky")]
fn test_ip_get_addr_bad_input() {
let localhost_name = ~"sjkl234m,./sdf";
let iotask = uv::global_loop::get();
let iotask = &uv::global_loop::get();
let ga_result = get_addr(localhost_name, iotask);
assert result::is_err(&ga_result);
}
Expand Down
74 changes: 41 additions & 33 deletions src/libstd/net_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub enum TcpConnectErrData {
* `net::tcp::tcp_connect_err_data` instance will be returned
*/
pub fn connect(input_ip: ip::IpAddr, port: uint,
iotask: IoTask)
iotask: &IoTask)
-> result::Result<TcpSocket, TcpConnectErrData> unsafe {
let result_po = oldcomm::Port::<ConnAttempt>();
let closed_signal_po = oldcomm::Port::<()>();
Expand All @@ -164,7 +164,7 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
ip::Ipv4(_) => { false }
ip::Ipv6(_) => { true }
},
iotask: iotask
iotask: iotask.clone()
};
let socket_data_ptr = ptr::addr_of(&(*socket_data));
log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch));
Expand Down Expand Up @@ -496,17 +496,17 @@ pub fn accept(new_conn: TcpNewConnection)
let server_data_ptr = uv::ll::get_data_for_uv_handle(
server_handle_ptr) as *TcpListenFcData;
let reader_po = oldcomm::Port();
let iotask = (*server_data_ptr).iotask;
let iotask = &(*server_data_ptr).iotask;
let stream_handle_ptr = malloc_uv_tcp_t();
*(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
let client_socket_data = @{
let client_socket_data: @TcpSocketData = @{
reader_po: reader_po,
reader_ch: oldcomm::Chan(&reader_po),
stream_handle_ptr : stream_handle_ptr,
connect_req : uv::ll::connect_t(),
write_req : uv::ll::write_t(),
ipv6: (*server_data_ptr).ipv6,
iotask : iotask
iotask : iotask.clone()
};
let client_socket_data_ptr = ptr::addr_of(&(*client_socket_data));
let client_stream_handle_ptr =
Expand Down Expand Up @@ -588,10 +588,10 @@ pub fn accept(new_conn: TcpNewConnection)
* of listen exiting because of an error
*/
pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
iotask: IoTask,
on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
new_connect_cb: fn~(TcpNewConnection,
oldcomm::Chan<Option<TcpErrData>>))
iotask: &IoTask,
on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
new_connect_cb: fn~(TcpNewConnection,
oldcomm::Chan<Option<TcpErrData>>))
-> result::Result<(), TcpListenErrData> unsafe {
do listen_common(move host_ip, port, backlog, iotask,
move on_establish_cb)
Expand All @@ -606,7 +606,7 @@ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
}

fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
iotask: IoTask,
iotask: &IoTask,
on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
on_connect_cb: fn~(*uv::ll::uv_tcp_t))
-> result::Result<(), TcpListenErrData> unsafe {
Expand All @@ -615,12 +615,12 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
let kill_ch = oldcomm::Chan(&kill_po);
let server_stream = uv::ll::tcp_t();
let server_stream_ptr = ptr::addr_of(&server_stream);
let server_data = {
let server_data: TcpListenFcData = {
server_stream_ptr: server_stream_ptr,
stream_closed_ch: oldcomm::Chan(&stream_closed_po),
kill_ch: kill_ch,
on_connect_cb: move on_connect_cb,
iotask: iotask,
iotask: iotask.clone(),
ipv6: match &host_ip {
&ip::Ipv4(_) => { false }
&ip::Ipv6(_) => { true }
Expand Down Expand Up @@ -895,7 +895,7 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) unsafe {
};
let close_data_ptr = ptr::addr_of(&close_data);
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe {
log(debug, fmt!("interact dtor for tcp_socket stream %? loop %?",
stream_handle_ptr, loop_ptr));
uv::ll::set_data_for_uv_handle(stream_handle_ptr,
Expand All @@ -916,7 +916,7 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint)
use timer;

log(debug, ~"starting tcp::read");
let iotask = (*socket_data).iotask;
let iotask = &(*socket_data).iotask;
let rs_result = read_start_common_impl(socket_data);
if result::is_err(&rs_result) {
let err_data = result::get_err(&rs_result);
Expand Down Expand Up @@ -956,7 +956,7 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) ->
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
let stop_po = oldcomm::Port::<Option<TcpErrData>>();
let stop_ch = oldcomm::Chan(&stop_po);
do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe {
log(debug, ~"in interact cb for tcp::read_stop");
match uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
0i32 => {
Expand Down Expand Up @@ -984,7 +984,7 @@ fn read_start_common_impl(socket_data: *TcpSocketData)
let start_po = oldcomm::Port::<Option<uv::ll::uv_err_data>>();
let start_ch = oldcomm::Chan(&start_po);
log(debug, ~"in tcp::read_start before interact loop");
do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe {
log(debug, fmt!("in tcp::read_start interact cb %?", loop_ptr));
match uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
on_alloc_cb,
Expand Down Expand Up @@ -1024,7 +1024,7 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData,
result_ch: oldcomm::Chan(&result_po)
};
let write_data_ptr = ptr::addr_of(&write_data);
do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| unsafe {
do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| unsafe {
log(debug, fmt!("in interact cb for tcp::write %?", loop_ptr));
match uv::ll::write(write_req_ptr,
stream_handle_ptr,
Expand Down Expand Up @@ -1369,7 +1369,7 @@ pub mod test {
}
}
pub fn impl_gl_tcp_ipv4_server_and_client() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 8888u;
let expected_req = ~"ping";
Expand All @@ -1381,6 +1381,7 @@ pub mod test {
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server
let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| {
run_tcp_test_server(
Expand All @@ -1389,7 +1390,7 @@ pub mod test {
expected_resp,
server_ch,
cont_ch,
hl_loop)
&hl_loop_clone)
};
server_result_ch.send(actual_req);
};
Expand All @@ -1415,7 +1416,7 @@ pub mod test {
assert str::contains(actual_resp, expected_resp);
}
pub fn impl_gl_tcp_ipv4_get_peer_addr() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 8887u;
let expected_resp = ~"pong";
Expand All @@ -1426,6 +1427,7 @@ pub mod test {
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server
let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| {
run_tcp_test_server(
Expand All @@ -1434,7 +1436,7 @@ pub mod test {
expected_resp,
server_ch,
cont_ch,
hl_loop)
&hl_loop_clone)
};
server_result_ch.send(actual_req);
};
Expand All @@ -1445,10 +1447,11 @@ pub mod test {
let server_ip_addr = ip::v4::parse_addr(server_ip);
let iotask = uv::global_loop::get();
let connect_result = connect(move server_ip_addr, server_port,
iotask);
&iotask);
let sock = result::unwrap(move connect_result);
debug!("testing peer address");
// This is what we are actually testing!
assert net::ip::format_addr(&sock.get_peer_addr()) ==
~"127.0.0.1";
Expand All @@ -1457,12 +1460,14 @@ pub mod test {
// Fulfill the protocol the test server expects
let resp_bytes = str::to_bytes(~"ping");
tcp_write_single(&sock, resp_bytes);
debug!("message sent");
let read_result = sock.read(0u);
client_ch.send(str::from_bytes(read_result.get()));
debug!("result read");
};
}
pub fn impl_gl_tcp_ipv4_client_error_connection_refused() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 8889u;
let expected_req = ~"ping";
Expand All @@ -1482,7 +1487,7 @@ pub mod test {
}
}
pub fn impl_gl_tcp_ipv4_server_address_in_use() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 8890u;
let expected_req = ~"ping";
Expand All @@ -1494,6 +1499,7 @@ pub mod test {
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server
let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| {
run_tcp_test_server(
Expand All @@ -1502,7 +1508,7 @@ pub mod test {
expected_resp,
server_ch,
cont_ch,
hl_loop)
&hl_loop_clone)
};
server_result_ch.send(actual_req);
};
Expand Down Expand Up @@ -1533,7 +1539,7 @@ pub mod test {
}
}
pub fn impl_gl_tcp_ipv4_server_access_denied() {
let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 80u;
// this one should fail..
Expand All @@ -1553,7 +1559,7 @@ pub mod test {
}
pub fn impl_gl_tcp_ipv4_server_client_reader_writer() {

let iotask = uv::global_loop::get();
let iotask = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 8891u;
let expected_req = ~"ping";
Expand All @@ -1565,6 +1571,7 @@ pub mod test {
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server
let iotask_clone = iotask.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| {
run_tcp_test_server(
Expand All @@ -1573,7 +1580,7 @@ pub mod test {
expected_resp,
server_ch,
cont_ch,
iotask)
&iotask_clone)
};
server_result_ch.send(actual_req);
};
Expand Down Expand Up @@ -1604,7 +1611,7 @@ pub mod test {
pub fn impl_tcp_socket_impl_reader_handles_eof() {
use core::io::{Reader,ReaderUtil};

let hl_loop = uv::global_loop::get();
let hl_loop = &uv::global_loop::get();
let server_ip = ~"127.0.0.1";
let server_port = 10041u;
let expected_req = ~"GET /";
Expand All @@ -1616,6 +1623,7 @@ pub mod test {
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server
let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| {
run_tcp_test_server(
Expand All @@ -1624,7 +1632,7 @@ pub mod test {
expected_resp,
server_ch,
cont_ch,
hl_loop)
&hl_loop_clone)
};
server_result_ch.send(actual_req);
};
Expand Down Expand Up @@ -1664,7 +1672,7 @@ pub mod test {
fn run_tcp_test_server(server_ip: &str, server_port: uint, resp: ~str,
server_ch: oldcomm::Chan<~str>,
cont_ch: oldcomm::Chan<()>,
iotask: IoTask) -> ~str {
iotask: &IoTask) -> ~str {
let server_ip_addr = ip::v4::parse_addr(server_ip);
let listen_result = listen(move server_ip_addr, server_port, 128,
iotask,
Expand Down Expand Up @@ -1751,7 +1759,7 @@ pub mod test {
}

fn run_tcp_test_server_fail(server_ip: &str, server_port: uint,
iotask: IoTask) -> TcpListenErrData {
iotask: &IoTask) -> TcpListenErrData {
let server_ip_addr = ip::v4::parse_addr(server_ip);
let listen_result = listen(move server_ip_addr, server_port, 128,
iotask,
Expand All @@ -1775,7 +1783,7 @@ pub mod test {

fn run_tcp_test_client(server_ip: &str, server_port: uint, resp: &str,
client_ch: oldcomm::Chan<~str>,
iotask: IoTask) -> result::Result<~str,
iotask: &IoTask) -> result::Result<~str,
TcpConnectErrData> {
let server_ip_addr = ip::v4::parse_addr(server_ip);

Expand Down
Loading

0 comments on commit df5338a

Please sign in to comment.