Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams can prevent any other futures from being scheduled again #869

Closed
sdroege opened this issue Mar 13, 2018 · 8 comments
Closed

Streams can prevent any other futures from being scheduled again #869

sdroege opened this issue Mar 13, 2018 · 8 comments

Comments

@sdroege
Copy link
Contributor

sdroege commented Mar 13, 2018

This is copied over from tokio-rs/tokio#207
Please see code below for a contrived example of the problem, and a workaround.

The problem here is that a single stream that continues to produce items will block a whole thread of the executor forever, instead of allowing other scheduled futures to be handled after an item was produced. Thus basically causing starvation. This happens because Stream::for_each is basically an infinite loop as long as items can be produced, going out of the loop with NotReady after each item (how to wake up the future afterwards best?) would solve this problem.

In practice this can cause e.g. a couple of fast TCP connections that are handled with for_each to starve any slower TCP connections, or in my specific use case of a Stream around an UdpSocket it allows any one of the sockets to completely occupy a thread (as long as packets only arrive fast enough) and prevent any other sockets with slower packet rate to be ever scheduled again. Note that fast/slow here is relative, and related to the processing time of each stream item and how fast new items arrive.

Is this expected behaviour and one is expected to implement a custom "scheduler" around e.g. Stream::into_future to do round-robin scheduling of all "equal" streams?

extern crate futures;
extern crate tokio;
extern crate tokio_reactor;

use futures::stream;
use futures::{Future, Stream};
use tokio::executor::thread_pool;
use tokio::reactor;

fn main() {
    let reactor = reactor::Reactor::new().unwrap().background().unwrap();

    let handle = reactor.handle().clone();

    let mut pool_builder = thread_pool::Builder::new();
    pool_builder.around_worker(move |w, enter| {
        ::tokio_reactor::with_default(&handle, enter, |_| {
            w.run();
        });
    });

    // Important to have 1 thread here, otherwise
    // both streams would just block two threads
    // forever.
    pool_builder.pool_size(1);
    let pool = pool_builder.build();

    pool.spawn(stream::repeat(1).for_each(|n| {
        println!("{}", n);

        Ok(())
    }));

    pool.spawn(stream::repeat(2).for_each(|n| {
        println!("{}", n);

        Ok(())
    }));

    pool.shutdown_on_idle()
        .and_then(|_| reactor.shutdown_on_idle())
        .wait()
        .unwrap();
}

A workaround for this would be the following, see the YieldOnce Future below. While that works around the problem, the default behaviour here seems like a potential footgun that people will only notice once it's too late.

extern crate futures;
extern crate tokio;
extern crate tokio_reactor;

use futures::stream;
use futures::task;
use futures::{Future, Stream, Poll, Async};
use tokio::executor::thread_pool;
use tokio::reactor;

struct YieldOnce(Option<()>);

impl YieldOnce {
    fn new() -> Self {
        YieldOnce(None)
    }
}

impl Future for YieldOnce {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<(), ()> {
        if let Some(_) = self.0.take() {
            Ok(Async::Ready(()))
        } else {
            self.0 = Some(());
            task::current().notify();
            Ok(Async::NotReady)
        }
    }
}

fn main() {
    let reactor = reactor::Reactor::new().unwrap().background().unwrap();

    let handle = reactor.handle().clone();

    let mut pool_builder = thread_pool::Builder::new();
    pool_builder.around_worker(move |w, enter| {
        ::tokio_reactor::with_default(&handle, enter, |_| {
            w.run();
        });
    });

    // Important to have 1 thread here, otherwise
    // both streams would just block two threads
    // forever.
    pool_builder.pool_size(1);
    let pool = pool_builder.build();

    pool.spawn(stream::repeat(1).for_each(|n| {
        println!("{}", n);

        task::current().notify();

        YieldOnce::new()
    }));

    pool.spawn(stream::repeat(2).for_each(|n| {
        println!("{}", n);

        task::current().notify();

        YieldOnce::new()
    }));

    pool.shutdown_on_idle()
        .and_then(|_| reactor.shutdown_on_idle())
        .wait()
        .unwrap();
}

@carllerche said in the tokio ticket "IMO, the combinators should handle yielding internally."

@stbuehler
Copy link
Contributor

Yielding after every iteration is very likely to kill performance... :(

Maybe make it configurable (whatever the default is going to be):

impl<S, U, F> ForEach<S, U, F> {
    pub fn yield_after(self, limit: Option<usize>) -> Self {
        self.set_yield_after(limit);
        self
    }

    pub fn set_yield_after(&mut self, limit: Option<usize>) {
        // ...
    }
}

@carllerche
Copy link
Member

IMO, this should probably be tracked at the task level. Especially with async / await coming, I think it will be important to have some sort of built in strategy to make it easier for tasks to yield back to the executor.

This would be similar to what erlang / go do w/ injecting yield points in generated code.

@carllerche
Copy link
Member

Come to think of it, the answer might just be to deal with this entirely at the async / await level.

/cc @withoutboats

@sdroege
Copy link
Contributor Author

sdroege commented Mar 18, 2018

Come to think of it, the answer might just be to deal with this entirely at the async / await level.

The problem with that would be that it would be still problematic if you use the futures combinators directly then. Also this specific case can't really be handled without changes to futures, the main problem right now is the implementation of the for_each combinator (and possibly others) here.

@stbuehler
Copy link
Contributor

For better integration the Async enum could be extended to include a Yield variant to signal it should be polled again later without getting awoken explicitly. Might be better for performance too.

@Nemo157
Copy link
Member

Nemo157 commented Mar 19, 2018

Updated example for futures 0.2:

extern crate futures;

use futures::prelude::*;
use futures::stream;
use futures::executor::LocalPool;

fn main() {
    let mut pool = LocalPool::new();
    let mut executor = pool.executor();

    executor.spawn_local(stream::repeat(1).for_each(|n| {
        println!("{}", n);
        Ok(())
    }).map(|_| ())).unwrap();

    executor.spawn_local(stream::repeat(2).for_each(|n| {
        println!("{}", n);
        Ok(())
    }).map(|_| ())).unwrap();

    pool.run(&mut executor);
}

Rather than implementing this directly on ForEach it seems it could just be another combinator. I've thrown together a quick implementation to show it working.

EDIT:
As a combinator futures-await could extend its #[async] for stream { ... } expansion to implicitly call StreamExt::yield_every(&mut stream, 10) to ensure async streams aren't blocked.

@cramertj
Copy link
Member

Some sort of fairness-offering combinator seems handy, and I'd be happy to accept a PR for it!

@mzabaluev
Copy link
Contributor

mzabaluev commented Nov 6, 2019

There seems to a systemic problem, as many long-running event driven processing futures (or streams considered as their into_future transformation) are most easily programmed with a poll loop that does not break as long as some event sources have polled ready and internal work has been done. If such a future is aggregated with other futures in the same task and its poll loop is saturated with work, other futures in the task will be starved.

I can see these solutions:

  • Teach some discipline among the developers to avoid polling in loops that can be saturated. Provide utilities to simplify cooperative poll programming, e.g. to force yielding after a maximum number of iterations.
  • Somehow force occasional Pending returns from the bottom primitives at the executor level. I doubt this is feasible, though.

jonhoo added a commit to tokio-rs/tokio that referenced this issue Mar 16, 2020
A single call to `poll` on a top-level task may potentially do a lot of
work before it returns `Poll::Pending`. If a task runs for a long period
of time without yielding back to the executor, it can starve other tasks
waiting on that executor to execute them, or drive underlying resources.
See for example rust-lang/futures-rs#2047, rust-lang/futures-rs#1957,
and rust-lang/futures-rs#869. Since Rust does not have a runtime, it is
difficult to forcibly preempt a long-running task.

Consider a future like this one:

```rust
use tokio::stream::StreamExt;
async fn drop_all<I: Stream>(input: I) {
    while let Some(_) = input.next().await {}
}
```

It may look harmless, but consider what happens under heavy load if the
input stream is _always_ ready. If we spawn `drop_all`, the task will
never yield, and will starve other tasks and resources on the same
executor.

This patch adds a `coop` module that provides an opt-in mechanism for
futures to cooperate with the executor to avoid starvation. This
alleviates the problem above:

```
use tokio::stream::StreamExt;
async fn drop_all<I: Stream>(input: I) {
    while let Some(_) = input.next().await {
        tokio::coop::proceed().await;
    }
}
```

The call to [`proceed`] will coordinate with the executor to make sure
that every so often control is yielded back to the executor so it can
run other tasks.

The implementation uses a thread-local counter that simply counts how
many "cooperation points" we have passed since the task was first
polled. Once the "budget" has been spent, any subsequent points will
return `Poll::Pending`, eventually making the top-level task yield. When
it finally does yield, the executor resets the budget before
running the next task.

The budget per task poll is currently hard-coded to 128. Eventually, we
may want to make it dynamic as more cooperation points are added. The
number 128 was chosen more or less arbitrarily to balance the cost of
yielding unnecessarily against the time an executor may be "held up".

At the moment, all the tokio leaf futures ("resources") call into coop,
but external futures have no way of doing so. We probably want to
continue limiting coop points to leaf futures in the future, but may
want to also enable third-party leaf futures to cooperate to benefit the
ecosystem as a whole. This is reflected in the methods marked as `pub`
in `mod coop` (even though the module is only `pub(crate)`). We will
likely also eventually want to expose `coop::limit`, which enables
sub-executors and manual `impl Future` blocks to avoid one sub-task
spending all of their poll budget.

Benchmarks (see #2160) suggest that the overhead of `coop`
is marginal.
jonhoo added a commit to tokio-rs/tokio that referenced this issue Mar 16, 2020
A single call to `poll` on a top-level task may potentially do a lot of
work before it returns `Poll::Pending`. If a task runs for a long period
of time without yielding back to the executor, it can starve other tasks
waiting on that executor to execute them, or drive underlying resources.
See for example rust-lang/futures-rs#2047, rust-lang/futures-rs#1957,
and rust-lang/futures-rs#869. Since Rust does not have a runtime, it is
difficult to forcibly preempt a long-running task.

Consider a future like this one:

```rust
use tokio::stream::StreamExt;
async fn drop_all<I: Stream>(input: I) {
    while let Some(_) = input.next().await {}
}
```

It may look harmless, but consider what happens under heavy load if the
input stream is _always_ ready. If we spawn `drop_all`, the task will
never yield, and will starve other tasks and resources on the same
executor.

This patch adds a `coop` module that provides an opt-in mechanism for
futures to cooperate with the executor to avoid starvation. This
alleviates the problem above:

```
use tokio::stream::StreamExt;
async fn drop_all<I: Stream>(input: I) {
    while let Some(_) = input.next().await {
        tokio::coop::proceed().await;
    }
}
```

The call to [`proceed`] will coordinate with the executor to make sure
that every so often control is yielded back to the executor so it can
run other tasks.

The implementation uses a thread-local counter that simply counts how
many "cooperation points" we have passed since the task was first
polled. Once the "budget" has been spent, any subsequent points will
return `Poll::Pending`, eventually making the top-level task yield. When
it finally does yield, the executor resets the budget before
running the next task.

The budget per task poll is currently hard-coded to 128. Eventually, we
may want to make it dynamic as more cooperation points are added. The
number 128 was chosen more or less arbitrarily to balance the cost of
yielding unnecessarily against the time an executor may be "held up".

At the moment, all the tokio leaf futures ("resources") call into coop,
but external futures have no way of doing so. We probably want to
continue limiting coop points to leaf futures in the future, but may
want to also enable third-party leaf futures to cooperate to benefit the
ecosystem as a whole. This is reflected in the methods marked as `pub`
in `mod coop` (even though the module is only `pub(crate)`). We will
likely also eventually want to expose `coop::limit`, which enables
sub-executors and manual `impl Future` blocks to avoid one sub-task
spending all of their poll budget.

Benchmarks (see #2160) suggest that the overhead of `coop`
is marginal.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants