Skip to content

tasks hang #3435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jesse99 opened this issue Sep 9, 2012 · 16 comments
Closed

tasks hang #3435

jesse99 opened this issue Sep 9, 2012 · 16 comments

Comments

@jesse99
Copy link
Contributor

jesse99 commented Sep 9, 2012

Using multiple tasks can result in some tasks that do not start up. I was unable to come up with a simple repro, but I do have a fairly involved one (see below). The failure pattern is very similar to #2841.

This is with rust from master on Sep 1 on a Mac Pro 2009 with four cores.

@jesse99
Copy link
Contributor Author

jesse99 commented Sep 9, 2012

Here is the test case. Note that the unit test function actually runs the real test ten times (the real test fails about half the time).

// rustc --test sleep.rs && ./sleep
use std;

enum StateMesg
{
    AddListener(~str, comm::Chan<int>),    // str is used to identify the listener
    RemoveListener(~str),
    Shutdown,
}

type StateChan = comm::Chan<StateMesg>;

// Task used to maintain an uptime (in seconds) variable.
fn manage_state() -> StateChan
{
    do task::spawn_listener
    |state_port: comm::Port<StateMesg>|
    {
        let mut time = 0;
        let listeners = std::map::box_str_hash();
        loop
        {
            time += 1;
            info!("time = %?", time);
            libc::funcs::posix88::unistd::sleep(1);
            for listeners.each_value |ch| {comm::send(ch, copy(time))};

            if state_port.peek()
            {
                match state_port.recv()
                {
                    AddListener(key, ch) =>
                    {
                        let added = listeners.insert(@(copy key), ch);
                        assert added;
                    }
                    RemoveListener(key) =>
                    {
                        listeners.remove(@(copy key));
                    }
                    Shutdown =>
                    {
                        break;
                    }
                }
            }
        }
    }
}

enum ControlEvent
{
    CloseEvent,
}

type ControlPort = comm::Port<ControlEvent>;
type ControlChan = comm::Chan<ControlEvent>;

type PushChan = comm::Chan<int>;

// Registers with the manage task and gets notified when uptime changes.
// This is where things go south: we try to start three of these tasks, but
// only two start up (although after running multiple times it seems that
// there are some failues where all of these tasks start).
fn uptime_sse(registrar: StateChan, push: PushChan) -> ControlChan
{
    do task::spawn_listener
    |control_port: ControlPort|
    {
        io::println(fmt!("starting uptime sse stream"));
        let notify_port = comm::Port();
        let notify_chan = comm::Chan(notify_port);

        let key = fmt!("uptime %?", ptr::addr_of(notify_port));
        comm::send(registrar, AddListener(copy key, notify_chan));

        loop
        {
            match comm::select2(notify_port, control_port)
            {
                either::Left(new_time) =>
                {
                    info!("   new_time = %?", new_time);
                    comm::send(push, new_time);
                }
                either::Right(CloseEvent) =>
                {
                    info!("shutting down uptime sse stream");
                    comm::send(registrar, RemoveListener(key));
                    break;
                }
            }
        }
    }
}

// These are tasks that just sit around doing nothing. (I was not able to get a
// failure without these).
fn blocked_task() -> StateChan
{
    do task::spawn_listener
    |state_port: comm::Port<StateMesg>|
    {
        info!("starting blocked");
        loop
        {
            match state_port.recv()
            {
                Shutdown =>
                {
                    break;
                }
                _ =>
                {
                }
            }
        }
    }
}

// Number of  listener and blocked tasks to start up. May be able to reduce these values
// and still get a failure.
const num_listeners: uint = 3;
const num_blocked: uint = 12;

fn run_test() -> bool
{
    // Startup the task to manage the uptime variable.
    let registrar = manage_state();

    // Startup some tasks that block on recv until the very end of the test.
    // This corresponds to worker tasks in my app.
    info!("starting %? blocked tasks", num_blocked);
    let mut blocked = ~[];
    for num_blocked.times
    {
        let control = blocked_task();
        vec::push(blocked, control);
    }

    // Tasks that listen for changes in uptime. In my real (test) app it failed with
    //  one of these and it uses a server sent event to push the new uptime to a web
    // page.
    io::println(fmt!("starting %? uptime tasks", num_listeners));
    let mut listeners = ~[];
    for num_listeners.times
    {
        let push_port = comm::Port();
        let push_chan = comm::Chan(push_port);

        let control = uptime_sse(registrar, push_chan);
        vec::push(listeners, (push_port, control));
    }

    // Give the listeners plenty of time to get at least one update.
    // Note that this fails in the same way even if we sleep a lot longer.
    libc::funcs::posix88::unistd::sleep((5*num_listeners + 3) as libc::types::os::arch::c95::c_uint);

    // Print the uptimes received by each listener task.
    let mut times = ~[];
    for listeners.eachi
    |i, comms|
    {
        let (push_port, _control_chan) = comms;

        let mut last_time = 0;
        while (push_port.peek())
        {
            let time = push_port.recv();
            info!("   %? received = %?", i, time);
            if time > last_time
            {
                last_time = time;
            }
        }
        vec::push(times, last_time);
    }

    // Close down the tasks.
    for blocked.each
    |state_chan|
    {
        comm::send(state_chan, Shutdown)
    }

    for listeners.eachi
    |i, comms|
    {
        let (_push_port, control_chan) = comms;
        io::println(fmt!("%? time = %?", i, times[i]));
        comm::send(control_chan, CloseEvent)
    }

    comm::send(registrar, Shutdown);

    // Each listener should have received at least one update.
    do times.all |t| {t > 0}
}

// Only use a single unit test function to avoid interactions with running multiple unit tests
// in parallel (which could be an issue if this is dropped into the rust test suite).
#[test]
fn tester()
{
    // Ran this three times with the following results:
    // 5 out of 10 failed
    // 4 out of 10 failed
    // 7 out of 10 failed
    let num_tests = 10;
    let mut num_failures = 0;

    for num_tests.timesi
    |i|
    {
        io::println(fmt!("\nrunning %? of %?", i+1, num_tests));
        if !run_test()
        {
            num_failures += 1;
        }
    }

    if num_failures > 0
    {
        io::println(fmt!("%? tests out of %? failed", num_failures, num_tests));
        assert false;
    }
}

@jesse99
Copy link
Contributor Author

jesse99 commented Sep 9, 2012

Fwiw I was able to run the test case from #2841 five times with no failures.

@jesse99
Copy link
Contributor Author

jesse99 commented Sep 9, 2012

Got the above to work by changing uptime_sse to use do spawn_threaded_listener(2) where spawn_threaded_listener is defined as:

// Like spawn_listener except the new task (and whatever tasks it spawns) are distributed
// among a fixed number of OS threads.
fn spawn_threaded_listener<A:send>(num_threads: uint, +block: fn~ (comm::Port<A>)) -> comm::Chan<A>
{
    let channel_port: comm::Port<comm::Chan<A>> = comm::Port();
    let channel_channel = comm::Chan(channel_port);

    do task::spawn_sched(task::ManualThreads(num_threads))
    {
        let task_port: comm::Port<A> = comm::Port();
        let task_channel = comm::Chan(task_port);
        comm::send(channel_channel, task_channel);

        block(task_port);
    };

    comm::recv(channel_port)
}

Which is the same sort of work around I had to use with #2841.

@brson
Copy link
Contributor

brson commented Sep 9, 2012

Thanks for the good report. I haven't been able to reproduce the deadlock yet but note that

    // Give the listeners plenty of time to get at least one update.
    // Note that this fails in the same way even if we sleep a lot longer.
    libc::funcs::posix88::unistd::sleep((5*num_listeners + 3) as libc::types::os::arch::c95::c_uint);

Probably doesn't behave like you want. This won't necessarily give the listeners an 'opportunity to get at least one update'. sleep will block a scheduler thread, preventing any other tasks scheduled on that thread from running, so if the tasks necessary to create and receive that update are on the blocked thread then they won't make the expected progress. (This is a partly a side-effect of not having a work-stealing implementation yet, but even on e.g. a 4-core machine having 4 tasks calling libc sleep in parallel will stop all progress).

Is the sleeping critical to the functioning of the test?

You might want to experiment with this non-blocking sleep:

fn sleep(s: uint) {
    let iotask = std::uv::global_loop::get();
    std::timer::sleep(iotask, s * 1000);
}

@brson
Copy link
Contributor

brson commented Sep 9, 2012

I suggest always using the above sleep. Calling libc sleep, even if it works, is going to be bad for throughput.

@jesse99
Copy link
Contributor Author

jesse99 commented Sep 9, 2012

Only two tasks call sleep: the manage_state task and the unit test task that calls the tester function. The test app uses the first sleep, the second sleep is an artifact of trying to repro the failure in a unit test.

@jesse99
Copy link
Contributor Author

jesse99 commented Sep 9, 2012

On my machine the test did pass when I used brson's sleep function. The number of updates each task received was also way more consistent with the uv sleep.

@jesse99
Copy link
Contributor Author

jesse99 commented Sep 9, 2012

But my test app still hangs when I try to use uv sleep (nearly certain that the app and all the user libraries it uses just have the one sleep call).

@jesse99
Copy link
Contributor Author

jesse99 commented Sep 10, 2012

Thought about this some more and I better understand your point about the badness of libc::sleep. For example, one failure mode would be if both the unit test task and the manage_state task wound up on the thread. Then the unit_test task could do the big sleep potentially blocking the manage_state task. When the unit_test finally stops sleeping the manage_state task would not have nearly enough time to finish its work.

This is likely the same problem with my test app: it's a web server with two tasks listening for incoming connections (on different interfaces) and one or more tasks handling requests for a connection. This is using the old rust-socket library which doesn't do anything fancy with scheduling and two of those tasks are pretty much always blocked which could easily hose other tasks.

@brson
Copy link
Contributor

brson commented Sep 10, 2012

That's right. Any task using rust-socket should probably have it's own single-threaded scheduler.

@bblum
Copy link
Contributor

bblum commented Sep 11, 2012

Is the hang nondeterministic? Can you add print statements to see where they are hanging? Are you convinced that the test code shouldn't hang?

@jesse99
Copy link
Contributor Author

jesse99 commented Sep 11, 2012

I think the test should hang...on some systems anyway. I'd like to see some improvements so that other people can avoid the same problems I ran into but those are ticketed separately so I think this issue should be closed.

For what it's worth I just finished changing over my socket code and the one place I used libc::sleep to use SingleThreaded and then ManualThreads(2) or (4) to spawn subtasks. Still not sure what the default is (ThreadsPerCore is the logical choice but it and ThreadPerTask are not implemented), but my apps are working great now.

@bblum
Copy link
Contributor

bblum commented Sep 11, 2012

huh, the documentation doesn't say what the default is. it's ThreadsPerCore.

@jesse99
Copy link
Contributor Author

jesse99 commented Sep 12, 2012

Hmm, when I try to use ThreadPerCore with spawn_sched I get task failed at 'thread_per_core scheduling mode unimplemented'.

@brson
Copy link
Contributor

brson commented Sep 12, 2012

ThreadPerCore is not implemented in the task API, but it is the default mode for the default scheduler. Implementing it though is just a matter of wiring in some existing runtime code.

@brson
Copy link
Contributor

brson commented Sep 12, 2012

I opened #3464

@jesse99 jesse99 closed this as completed Sep 25, 2012
RalfJung pushed a commit to RalfJung/rust that referenced this issue Mar 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants