Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Footgun lurking in FuturesUnordered and other concurrency-enabling streams #2387

Open
farnz opened this issue Apr 7, 2021 · 16 comments
Open

Comments

@farnz
Copy link

farnz commented Apr 7, 2021

We've found a nasty footgun when we use FuturesUnordered (or buffered etc) to get concurrency from a set of futures.

Because FuturesUnordered only polls its contents when it is polled, it is possible for futures lurking in the queue to be surprised by a long poll, even though no individual future spends a long time in poll(). This causes issues in two cases:

  1. When interfacing with an external system via the network; if you take a result from the stream with while let Some(res) = stream.next().await and then do significant wall-clock time inside the loop (even if very little CPU time is involved because you're awaiting another network service), you can hit the external system's timeouts and fail unexpectedly.
  2. When using an async friendly semaphore (like Tokio provides), you can deadlock yourself by having the tasks that are waiting in the FuturesUnordered owning all the semaphores, while having an item in a .for_each() block after buffer_unordered() requiring a semaphore.

https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=f58e77ba077b40eba40636a4e32b5710 shows the effect. Naïvely, you'd expect all the 10 to 20ms sleep futures to complete in under 40ms, and the 100ms sleep futures to take 100ms to 200ms. However, you can see that the sleep threads complete in the timescale expected and send the wakeup to the future that spawned them, but some of the short async sleep_for futures take over 100ms to complete, because while the thread signals them to wake up, the loop is .awaiting a long sleep future and does not get round to polling the stream again for some time.

We've found this in practice with things where the loop body is "nice" in the sense that it doesn't run for very long inside its poll function, but the total time spent in the loop body is large. The futures being polled by FuturesUnordered do:

async fn do_select<T>(database: &Database, query: Query) -> Result<Vec<T>> {
    let conn = database.get_conn().await?;
    conn.select_query(query).await
}

and the main work looks like:

async fn do_work(database: &Database) {
    let work = do_select(database, FIND_WORK_QUERY)?;
    stream::iter(
        work
            .into_iter()
            .map(|item| do_select(database, work_from_item(item)).await)
            .buffered(5)
            .for_each(|work_item| do_giant_work(work_item)).await;
}

do_giant_work can take 20 seconds wall clock time for big work items. It's possible for get_conn to open the connection (which has a 10 second idle timeout) for each Future in the buffered set, send the first handshake packet, and then return Poll::Pending as it waits for the reply. When the first of the 5 in the buffered set returns Poll::Ready(item), the code then runs do_giant_work which takes 20 seconds. While do_giant_work is in control, nothing re-polls the buffered set of Futures, and so the idle timeout kicks in server-side, and all of the 4 open connections get dropped because we've opened a connection and then not completed the handshake.

We can mitigate the problem by using spawn_with_handle to ensure that the do_select work happens whenever the do_giant_work Future awaits something, but this behaviour has surprised my team more than once (despite enough experience to diagnose this after the fact).

I'm not sure that a perfect technical solution is possible; the issue is that FuturesUnordered is a sub-executor driven by the main executor, and if not polled, it can't poll its set of pending futures. Meanwhile, the external code is under no obligation to poll the FuturesUnordered in a timely fashion. Spawning the futures before putting them in the sub-executor works because the main executor then drives them, and the sub-executor is merely picking up final results, but futures have to be 'static lifetime to be spawned.

I see two routes from here to a better place:

  1. Document the issue clearly, so that less experienced users don't get surprised by this behaviour. I suspect that there are uses of the select! and select_biased! macros that hit a similar problem.
  2. Somehow change FuturesUnordered et al so that the futures are polled by the main executor directly, even when the sub-executor is not being polled. I have no idea how to do this, though.
@pcwalton
Copy link

pcwalton commented Apr 7, 2021

It seems to me that the right solution is to spawn the futures, and the problem involves the 'static lifetime requirement. This is a requirement because there is no guarantee that the parent future won't be dropped before the child futures are. That is, it's the dreaded completion futures issue yet again.

@pcwalton
Copy link

pcwalton commented Apr 7, 2021

I'm told that https://tokio.rs/blog/2020-04-preemption is relevant.

@farnz
Copy link
Author

farnz commented Apr 7, 2021

The Tokio preemption thing is related, but does not fix this issue. Preemption in Tokio is designed to ensure that a Task returns to the executor every so often (and thus can be rescheduled so that another Task can run); it does not help with starvation inside a Task, since there's nothing that can return to polling the other futures in a Task if the task is only interested in polling a different future.

In this case, even if every future in the program takes part in Tokio pre-emption, you can still starve the futures in a sub-scheduler like FuturesUnordered; the issue is that the user code based on this library and on Tokio can accidentally introduce starvation inside a Task, and in a very non-obvious fashion.

I am strongly inclined towards the docs fix for this (making sure that the non-obvious problem is called out), because I can't see any way to keep the parent future moving that doesn't fall foul of the completion futures issue. But note that if there was a way to have a FuturesUnordered keep polling all its child futures even after returning from poll that didn't involve 'static (hence ruling out spawning), that would work, too.

@pcwalton
Copy link

pcwalton commented Apr 7, 2021

Is there a reason why you can't make the futures that you need to spawn 'static? Carl and I have discussed introducing a JoinSet to Tokio which would replace FuturesUnordered for this type of use case. But it would require 'static too (though I have ideas for relaxing this requirement somewhat in the future).

@nikomatsakis
Copy link
Contributor

I've not dug into the technical details but I'll note that this would make a great status quo issue, so I opened rust-lang/wg-async#131

@farnz
Copy link
Author

farnz commented Apr 7, 2021

For now, most of our futures are 'static because we still haven't fully moved off futures-0.1 (yes, I know, we need to finish migration), so spawning is not a problem.

It's more that we keep finding different variations on this surprise in the Mononoke codebase (one with Tokio semaphores, one with a C++ async FFI, one with just Rust code), which implies that it's not completely obvious that this starvation issue can happen.

@jonhoo
Copy link
Contributor

jonhoo commented Apr 7, 2021

This is also somewhat related to #2053.

@withoutboats
Copy link

withoutboats commented Mar 23, 2023

(NOT A CONTRIBUTION)

I know this is an old issue but I think there's another perspective on this than what I've seen on the thread so far: the problem is not with FuturesUnordered, the problem is with for_each:

for_each's documentation reads:

The closure provided will be called for each item this stream produces, yielding a future. That future will then be executed to completion before moving on to the next item.

The use of the buffered abstraction perfectly makes these do_select operations concurrent with one another, there's really no problem there from my perspective. The problem is that they are then not concurrent with do_giant_work, because you went and used a non-concurrent combinator to chain do_giant_work.

This is why for_each_concurrent exists, which would have made the entire thing concurrent and solved the problem without spawning.

There's real issues here about how to make this less foot-gunny, but it seems very odd to me to jump to the idea that these should have been spawned, when FuturesUnordered could work fine if you had just used it through the entire process instead of re-serializing your future at the part where it is actually doing expensive work.

@farnz
Copy link
Author

farnz commented Mar 24, 2023

@withoutboats How would I use for_each_concurrent to make the do_selects happen concurrently until I had a filled buffer, but not run multiple copies of do_giant_work concurrently? do_giant_work is quick to complete relative to the latency of do_select, so I need the buffering to hide the latency of do_select, but it also consumes sufficient memory if I run two copies concurrently, the OOM killer will take down my application.

This wasn't an unusual pattern in the system I reduced this from (I no longer work there, so can't get you real examples); we had something equivalent to do_select which was high latency, low memory consumption, but high throughput, and something equivalent to do_giant_work which brought the system to within 1% of running out of memory completely and being OOM-killed, but was also reasonably quick to complete most of the time (except when it isn't).

@withoutboats
Copy link

withoutboats commented Mar 24, 2023

(NOT A CONTRIBUTION)

I see. That is a more complicated pattern than I initially thought. But the solution is to buffer the results (rather than the futures) and then concurrently run a single do_giant_work with filling that buffer up the limit. I'm not sure off the top of my head how the combinators on StreamExt give affordances for that, but I still don't think the solution is necessarily spawning new tasks. And I think the open question is how to adjust the APIs for streams to give better affordances for expressing exactly the concurrency you want, which I would agree they don't do super well right now.

Basically, you only need to spawn new tasks if you're trying to achieve parallelism, not concurrency. If do_giant_work took a lot of CPU time and not just clock time you should do something like spawn_blocking, in which case the data it takes would need to be 'static. But if all you need is concurrency, that can be done within a single task.

EDIT: A channel is probably the right primitive for buffering the results, then you need to select between a FuturesUnordered of do_select and pushing the output into the channel, and a future that pulls from the channel and runs do_giant_work. Probably this is easier to implement with a structured concurrency API than with select.

@farnz
Copy link
Author

farnz commented Mar 24, 2023

The pattern of concurrency I wanted was to have up to 5 do_select calls plus one do_giant_work call running concurrently.

do_giant_work would spawn off more threads if needed, including spawn_blocking for the compression/decompression work that took significant CPU time, to ensure that we spent no more than 100ms in a single call to poll.

The very root of the problem is that Buffered acts like the channel you suggest would - it limits the number of futures in flight, and it stores results as futures complete, just as you'd get from pushing values into a channel. It has advantages over FuturesUnordered in that it limits the number of futures it will poll concurrently (to the number needed to fill the buffer), and it retains ordering. However, the output side of Buffered is a Stream in its own right, and there's no mechanism in the Stream API to let things chained after Buffered tell it to continue filling its buffer but not to return an item.

If we could design a way to say something like poll_no_result, which defaults to not doing anything, but which allows channel-like Streams such as Buffered and BufferUnordered to finish filling their internal buffers without returning anything, then it would be possible to redesign the Stream combinators so that if they would return Poll::Pending from poll, they will first call poll_no_result on their internal buffer.

This then functions like the select mechanism you described, and would keep things moving in this case.

@withoutboats
Copy link

withoutboats commented Mar 24, 2023

(NOT A CONTRIBUTION)

The very root of the problem is that Buffered acts like the channel you suggest would - it limits the number of futures in flight, and it stores results as futures complete, just as you'd get from pushing values into a channel.

No, this is not right. Buffered doesn't store the results of the the futures as they complete. That's exactly the problem this issue is describing, because that's why you can't keep driving those futures concurrently with do_giant_work - you have nowhere to store the results if they complete.

If you want that, you need an additional queue of results somewhere, that's what the channel would enable. But Buffered has no internal buffer of results - it's just a buffer of the futures themselves. Probably it is poorly named for this reason.

@farnz
Copy link
Author

farnz commented Mar 25, 2023

No, this is not right. Buffered doesn't store the results of the the futures as they complete. That's exactly the problem this issue is describing, because that's why you can't keep driving those futures concurrently with do_giant_work - you have nowhere to store the results if they complete.

Buffered does store the results of futures that have completed - in

queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
it puts them in a BinaryHeap to allow them to be reordered if they complete out-of-order. If we were able to poll it telling it to hang onto its results, it could fill that BinaryHeap with results until it had no futures in flight.

And the thing that makes buffered (and buffer_unordered, which has no channel-like behaviour) footguns is that they look like they "should" implement channel-like behaviour - limits and all. But buffer_unordered doesn't have any channel-like behaviour at all, and buffered doesn't decouple polling (although it does store results).

Perhaps if buffer_unordered gained a results store (making it more channel-like), and then both combinators returned two things you need to poll - an impl Future<Output=()> that drives the contents in the background, plus the Stream that you get today - it'd be clear that you need to add a select_biased to keep the buffer moving while the stream is blocked from making progress.

@withoutboats
Copy link

withoutboats commented Mar 25, 2023

(NOT A CONTRIBUTION)

Buffered does store the results of futures that have completed

Sorry, you're right, I was thinking of BufferUnordered. But it only fills that buffer as futures complete out of order; once the first future finishes it will return it and not keep it in queue. There's no way to poll it without taking the first result to finish out of the queue. But I think overall, we're saying the same things back and forth to one another:

Perhaps if buffer_unordered gained a results store (making it more channel-like), and then both combinators returned two things you need to poll - an impl Future<Output=()> that drives the contents in the background, plus the Stream that you get today - it'd be clear that you need to add a select_biased to keep the buffer moving while the stream is blocked from making progress.

Because this is exactly what I mean. That's the affordance the API is missing.

I really responded to this issue because I agree that the current stream API is "footgunny" (and so is select for that matter) but I strongly don't think the problem is the lack of parallel scoped tasks - I'm really responding to @pcwalton's comment about this being a completionfuture problem more than anything. I think a structured concurrency scoped task API (without parallelism, like moro) might be a good tool for a better API though. Then you could spawn the stream of do_selects and the do_giant_work loop. This would need to be paired with a rethought set of stream combinators, probably.

@ethe
Copy link
Contributor

ethe commented Jan 10, 2024

An alternative but simpler and more common, this makes it easy to let people know what is happened:

use std::time::{Duration, Instant};

use async_io::{block_on, Timer};
use async_stream::stream;
use futures::StreamExt;
use futures_lite::future::yield_now;

fn main() {
    let result = block_on(async {
        stream! {
            loop {
                yield async {
                    let start = Instant::now();
                    yield_now().await;  // future should be waken immediately
                    Instant::now().duration_since(start)  // it must takes a very short time
                };
            }
        }
        .buffered(5)
        .take(5)
        .then(|d| async move {
            Timer::after(Duration::from_millis(500)).await;
            d
        })
        .collect::<Vec<_>>()
        .await
    });
    dbg!(result);
    // [examples/buffered_stream.rs:35:5] result = [
    //     612.875µs,
    //     501.832917ms,
    //     1.002531209s,
    //     1.503673417s,
    //     2.004864417s,  <---- ???
    // ]
}

zawedcvg added a commit to zawedcvg/Silica that referenced this issue Jul 8, 2024
there could be possible improvements by not using join_all, due to the
issue mentioned in rust-lang/futures-rs#2387.
However, I doubt there is much speedup to be extracted, since most of
the bottleneck are the requests.
zawedcvg added a commit to zawedcvg/Silica that referenced this issue Jul 9, 2024
there could be possible improvements by not using join_all, due to the
issue mentioned in rust-lang/futures-rs#2387.
However, I doubt there is much speedup to be extracted, since most of
the bottleneck are the requests.
@dylanahsmith
Copy link

I've run into the same problem, where I mistakenly expected a rust Stream to be based on the mental model of a physical stream, where if the stream were blocked upstream, it would leave a gap that would be filled in even after the stream gets blocked downstream.

Fundamentally, it seems like the Stream trait seems to be missing a poll upstream buffers method (e.g. poll_upstream or poll_buffered), so that outer structs (downstream) can use that on their inner struct (upstream) when they aren't ready for another item through poll_next. That can then propagate all the way upstream if it is implemented in all the Stream implementations in the chain.

Introducing a required method on the trait like this would be a breaking change, but it could be first introduced as a method on the trait with a default implementation, although making it required would be needed to actually avoid this being a footgun and make it behave more like its namesake.

After that, it would be useful to document the fact that the stream doesn't work like a pipeline without any buffer between steps. E.g. two steps chained together that each await for 100ms would benefit from a buffer of size 1 between them for them to operate independently on different items. The ability to poll upstream buffers could allow a convenience library to do this automatically, but a buffer wouldn't be needed between each step when multiple quick steps are followed by a slow step, just one preceding the slow step would be useful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants