Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net_tcp read timeout #3599

Closed
jesse99 opened this issue Sep 26, 2012 · 11 comments
Closed

net_tcp read timeout #3599

jesse99 opened this issue Sep 26, 2012 · 11 comments

Comments

@jesse99
Copy link
Contributor

jesse99 commented Sep 26, 2012

Not completely sure if this is a bug because I'm finding it difficult to figure out how to use net_tcp using the unit tests. But the below code times out when both the client and the server try to read.

// rustc --test tcp.rs && export RUST_LOG=tcp=3,std::net_tcp=3 && ./tcp
extern mod std;

use ip = std::net_ip;
use std::net_ip::{IpAddr};
use tcp = std::net_tcp;
use uv = std::uv;
use comm::{Chan, Port};
use Option = option::Option;

#[forbid(implicit_copies)]
#[allow(non_implicitly_copyable_typarams)]    // uv uses non-implicitly copyable Result types

// Note that net_tcp currently does not use SO_REUSEADDR so you may need
// to change this port if you get an AddressInUse error.
const server_port: uint = 8089;

// Spins up a task used to wait for new incoming connections from clients. 
priv fn run_server(addr: IpAddr, port: uint, exit_chan: Chan<()>)
{
    let hl_loop = uv::global_loop::get();

    // When we connect to a client the tasks that execute to service requests
    // from it will be distributed across two threads (unless some subtask
    // creates a new scheduler).
    do task::spawn_sched(task::ManualThreads(1))        // TODO: try using 2
    |move addr|
    {
        let backlog = 2;    // TODO: add a type alias or something for the horrible below
        let on_establish: fn~ (&&kill_ch: comm::Chan<Option<tcp::TcpErrData>>) = |_kill_ch, copy addr| {error!("listening for connections on %s", ip::format_addr(&addr))};

        // listen will block until it gets an error (from the system or from kill_ch). 
        error!("server is listening");
        let result = tcp::listen(copy addr, port, backlog, hl_loop, on_establish, on_connect);
        if result.is_err()
        {
            fail fmt!("failed listen on %s: %?", ip::format_addr(&addr), result.get_err());
        }

        // Let our caller know when we're done listening.
        error!("server is exiting");
        exit_chan.send(());
    }
}

priv fn on_connect(&&connection: tcp::TcpNewConnection, &&kill_chan: Chan<Option<tcp::TcpErrData>>)
{
    do task::spawn
    {
        error!("server is accepting a connection");
        let result = tcp::accept(connection);
        if result.is_ok()
        {
            error!("server accepted the connection");
            let sock = result::get_ref(&result);
            handle_connection(sock, kill_chan);
        }
        else
        {
            fail fmt!("accept to failed: %?", result.get_err());
        }
    }
}

priv fn handle_connection(sock: &tcp::TcpSocket, kill_chan: Chan<Option<tcp::TcpErrData>>)
{
    loop
    {
        let request = read_str("server request", sock);
        error!("server got request: %?", request);
        if request.starts_with("dupe: ")
        {
            let reply = request.slice(0, "dupe: ".len());
            write_str(sock, reply + reply);
        }
        else if request.starts_with("shutdown:")
        {
            // We need to stop the server tasks here. Sockets are closed when the last referrence
            // to them goes away. This will happen when we break out of this loop, but we
            // still need to terminate the listen loop which is why we send to kill_chan.
            let err = {err_name: ~"shutdown", err_msg: ~"client requested server  shutdown"};
            kill_chan.send(option::Some(err)); 
            break;
        }
        else
        {
            fail fmt!("bad request: '%s'", request);
        }
    }
}

priv fn run_client(addr: IpAddr, port: uint)
{
    let hl_loop = uv::global_loop::get();

    error!("client is connecting");
    let result = tcp::connect(addr, port, hl_loop);
    if result.is_ok()
    {
        error!("client connected");
        let sock = result::unwrap(move result);

        write_str(&sock, ~"dupe: hey");
        let reply = read_str("server reply", &sock);
        error!("client received: %?", reply);
        if reply != ~"heyhey"
        {
            fail fmt!("client expected 'heyhey', but found '%s'", reply);
        }

        write_str(&sock, ~"shutdown:");
    }
    else
    {
        fail fmt!("connect failed: %?", result.get_err());
    }
}

priv fn write_str(sock: &tcp::TcpSocket, message: ~str)
{
    do str::as_bytes(message)
    |buffer|
    {
error!("   writing %?", buffer);
        let result = sock.write(buffer);
        if result.is_err()
        {
            fail fmt!("write('%s') failed: %?", message, result.get_err());
        }
    }
}

// TCP is a stream oriented protocol so there are no messages as such:
// there are just streams of bytes. However framing does occur when
// the network stack packages up the bytes into a packet. Here we assume
// that our packets correspond to individual messages. In general this is
// a terrible idea: messages may at some point become too large for a
// single packet or the network stack may decide to gang up multiple
// messages in one packet.
priv fn read_str(expected: &str, sock: &tcp::TcpSocket) -> ~str
{
    // The right way to do this is to read each chunk, buffer the results,
    // and return the message part (e.g. using a delimeter like a null
    // character). Bonus points for not allowing rogue clients to grow
    // the buffer arbitrarily large.
    let timeout = 2000;                    // msecs
    match sock.read(timeout)
    {
        result::Ok(buffer) =>
        {
error!("   read %?", buffer);
            str::from_bytes(buffer)
        }
        result::Err(err) =>
        {
            fail fmt!("read %s failed: %?", expected, err);
        }
    }
}

fn resolve_addr(addr: &str) -> IpAddr
{
    let hl_loop = uv::global_loop::get();
    match ip::get_addr(addr, hl_loop)
    {
        result::Ok(addrs) if addrs.is_not_empty() =>
        {
            addrs[0]
        }
        result::Ok(*) =>
        {
            fail fmt!("ip::get_addr('%s') failed: empty result", addr);
        }
        result::Err(err) =>
        {
            fail fmt!("ip::get_addr('%s') failed: %?", addr, err);
        }
    }
}

#[test]
fn simple_client_server()
{
    let exit_port = Port();
    let exit_chan = Chan(exit_port);

    let addr = resolve_addr("127.0.0.1");
    run_server(addr, server_port, exit_chan);

    do task::spawn_sched(task::SingleThreaded)    // TODO: don't think we need to spawn a task here
    {
        run_client(addr, server_port);
        exit_chan.send(());
    }

    // Wait for the server to finish.
    exit_port.recv();
    exit_port.recv();
}

Logging shows this:

rustc --test tcp.rs && export RUST_LOG=tcp=3,std::net_tcp=3 && ./tcp
warning: no debug symbols in executable (-arch x86_64)

running 1 test
rust: ~"server is listening"
rust: ~"addr: { mut sin_family: 528, mut sin_port: 0, mut sin_addr: 16777343, mut sin_zero: (0, 0, 0, 0, 0, 0, 0, 0) }"
rust: ~"listening for connections on 127.0.0.1"
rust: ~"client is connecting"
rust: ~"tcp_connect result_ch Chan_(18)"
rust: ~"stream_handle_ptr outside interact 140379594689168"
rust: ~"in interact cb for tcp client connect.."
rust: ~"stream_handle_ptr in interact 140379594689168"
rust: ~"tcp_init successful"
rust: ~"dealing w/ ipv4 connection.."
rust: ~"addr: { mut sin_family: 528, mut sin_port: 0, mut sin_addr: 16777343, mut sin_zero: (0, 0, 0, 0, 0, 0, 0, 0) }"
rust: ~"tcp_connect successful"
rust: ~"leaving tcp_connect interact cb..."
rust: ~"tcp_connect result_ch Chan_(18)"
rust: ~"successful tcp connection!"
rust: ~"leaving tcp_connect_on_connect_cb"
rust: ~"tcp::connect - received success on result_po"
rust: ~"client connected"
rust: ~"   writing ~[ 100, 117, 112, 101, 58, 32, 104, 101, 121, 0 ]"
rust: ~"server is accepting a connection"
rust: ~"in interact cb for tcp::accept"
rust: ~"uv_tcp_init successful for client stream"
rust: ~"successfully accepted client connection"
rust: ~"in interact cb for tcp::write 140379593647056"
rust: ~"uv_write() invoked successfully"
rust: ~"successful write complete"
rust: ~"server accepted the connection"
rust: ~"starting tcp::read"
rust: ~"in tcp::read_start before interact loop"
rust: ~"starting tcp::read"
rust: ~"in tcp::read_start before interact loop"
rust: ~"in tcp::read_start interact cb 140379593647056"
rust: ~"success doing uv_read_start"
rust: ~"tcp::read before recv_timeout"
rust: ~"in tcp::read_start interact cb 140379593647056"
rust: ~"success doing uv_read_start"
rust: ~"tcp read on_alloc_cb!"
rust: ~"tcp::read before recv_timeout"
rust: ~"tcp read on_alloc_cb h: 140379593655232 char_ptr: 140379605528064 sugsize: 65536"
rust: ~"entering on_tcp_read_cb stream: 140379593655232 nread: 10"
rust: ~"tcp on_read_cb nread: 10"
rust: ~"exiting on_tcp_read_cb"
rust: ~"tcp::read after recv_timeout"
rust: ~"tcp::read: timed out.."
rust: ~"tcp::read after recv_timeout"
rust: ~"tcp::read: timed out.."
rust: ~"in interact cb for tcp::read_stop"
rust: ~"successfully called uv_read_stop"
rust: ~"in interact cb for tcp::read_stop"
rust: ~"successfully called uv_read_stop"
rust: task failed at 'read server request failed: { err_name: ~"TIMEOUT", err_msg: ~"req timed out" }', tcp.rs:156
rust: task failed at 'read server reply failed: { err_name: ~"TIMEOUT", err_msg: ~"req timed out" }', tcp.rs:156
rust: ~"interact dtor for tcp_socket stream 140379593655232 loop 140379593647056"
rust: ~"interact dtor for tcp_socket stream 140379594689168 loop 140379593647056"
rust: ~"tcp_socket_dtor_close_cb exiting.."
rust: ~"tcp_socket_dtor_close_cb exiting.."
/bin/sh: line 1: 56867 Segmentation fault: 11  ./tcp
make: *** [tcp] Error 139
exited with code 2

So it appears the the message is being read but the read function isn't returning for some reason (the first message is "dupe: hey\0" which is 10 bytes). This is on Mac with rust from Sep 22, 2012.

The seg faults are a bit disturbing too...

@kyledj
Copy link

kyledj commented Oct 22, 2012

I'm getting similar results (read blocking indefinitely, even with a short timeout) on 082d3d. This is also on OSX (10.7.4)

@fmartini
Copy link

fmartini commented Dec 3, 2012

The same issue also occurs on Windows 7 with Rust 0.4.

@brson
Copy link
Contributor

brson commented Dec 3, 2012

I should have responded to this before. Preemptive apology: this interface is not so good.

The problem here may be in accepting and handling the connection in the on_connect callback. That callback runs directly on the I/O loop, so doing long running operations in that callback will block all I/O entirely. The way it's intended to be used is to delegate the accepting and handling of the connection to another task, but there are some wrinkles.

You can't just call spawn from the on_connect callback because of #3760. The on_connect callback runs on the iotask's scheduler, and spawning another task there will cause one or both of them to block just the same as if you didn't spawn at all.

Additionally, a connection has to be 'accepted' before the on_connect callback returns, but calling accept returns a TcpSomething that is non-sendable, so you can't call 'accept' in the callback.

What you have to do is set up another task to handle the acceptance of connections.

Here's an example of a working TCP server and client, more readable than the TCP test cases: brson@fcfdd49#L0R789

@brson
Copy link
Contributor

brson commented Jan 29, 2013

@pnkfelix
Copy link
Member

i cannot tell if brson has left this issue open with the intent of using it to motivate development of a nicer interface. So I will not close this issue, but I will de-milestone it.

@pnkfelix
Copy link
Member

Not critical for 0.6; de-milestoning

@AndresOsinski
Copy link

I'm still having problems with this, using a sample similar to issue #4296.

My intention is to implement a simple TCP server to start implementing some higher-level protocols like FastCGI for Rust, so any efforts to move this forward would be highly appreciated :-)

@pnkfelix
Copy link
Member

@brson the link you provided , brson/rust@fcfdd49#L0R789 , is not working for me. (I was going to ask AndresOsinski if he had already read that material and was still having problems, but since the link isn't working, the question seems silly.) Can you provide a fresh and perhaps more stable link?

@teslaNova
Copy link

I'm having the same issue. read times out. captured traffic with wireshark points out that data was received before timeout.

Anyway, it seems to work with read_start()/_stop().

@machee
Copy link

machee commented Jul 7, 2013

I assume the updated example @brson provided refers to this test:
https://github.com/mozilla/rust/blob/f19fb2459f4fc695225f996692a6a6b30b801ee9/src/libextra/flatpipes.rs#L764

I removed the flatpipes specific code and some other code that seemed unneeded to make as basic of an example as I could. The Receiver task is needed because if that code is in the on_connect callback it blocks.
https://gist.github.com/machee/5943794

I share @mneumann's concern that the idea behind libuv is to use an event loop and not threads. Two tasks/threads might be acceptable, but the only way I could get an echo server to work with multiple connections was spawning a task for each connection.

Seems to me that read/read_start should not block, but should take a callback. It's not clear if this is the intended design or if it's just that more work is needed.

@emberian
Copy link
Member

I'm going to close this since net_tcp is gone and newrt is reworking this area heavily.

RalfJung pushed a commit to RalfJung/rust that referenced this issue May 19, 2024
Don't print unnecessary sysroot messages

Currently, when running `cargo miri setup`, we always print that a sysroot is being prepared, even if we just bail out of building after checking the hash. So that message is wrong; we didn't actually prepare a sysroot.

We also always print the preparing message for `cargo miri run`, even if no sysroot is prepared.

With this PR, `cargo miri run` prints no sysroot messages when an existing one is reused, and when a redundant `cargo miri setup` is requested, we print:
```
A sysroot for Miri is already available in `/home/ben/.cache/miri`.
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants