Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPSC queue hangs on repeated polling #7108

Open
jshaw-jump opened this issue Jan 17, 2025 · 8 comments · May be fixed by #7164
Open

MPSC queue hangs on repeated polling #7108

jshaw-jump opened this issue Jan 17, 2025 · 8 comments · May be fixed by #7164
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-coop Module: tokio/coop M-sync Module: tokio/sync

Comments

@jshaw-jump
Copy link

jshaw-jump commented Jan 17, 2025

Version

└── tokio v1.43.0
    └── tokio-macros v2.5.0 (proc-macro)

Platform
RHEL 8
x86_64
Intel CascadeLake

Description
The MPSC channel receiver is failing to receive messages when being repeatedly polled. I notice that it will receive exactly 128 elements (4 underlying blocks) before failing.

Here is a reproduction. Note that the select! macro simplifies the code here, but the issue also reproduces if the select! is replaced by a custom future that repeatedly invokes Future::poll on the Receiver object, verifying that the select! isn't a factor.

use tokio::{select, sync::mpsc};

#[tokio::main]
async fn main() {
    let (sender, mut receiver) = mpsc::channel(2);

    tokio::spawn(async move {
        for n in 0usize.. {
            sender.send(n).await.unwrap();
        }
    });

    loop {
        select! {
            msg = receiver.recv() => println!("Received {}", msg.unwrap()),
            () = std::future::ready(()) => {}
        }
    }
}

The output is as follows. No logging is observed after 128 elements have been received.

Received 0
Received 1
Received 2
... (lines omitted for brevity) ...
Received 127
@jshaw-jump jshaw-jump added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Jan 17, 2025
@Darksonn Darksonn added M-sync Module: tokio/sync M-coop Module: tokio/coop labels Jan 18, 2025
@Darksonn
Copy link
Contributor

This is due to Tokio's coop feature where Tokio attempts to force tasks to yield. The channel will return Pending until the task yields to the runtime.

@0xAlcibiades
Copy link

0xAlcibiades commented Feb 6, 2025

@trbritt @jeffwarlock and I saw what is perhaps a manifestation of the same issue today which was strange and counterintuitive in tokio::broadcast. When a sender fires off a message, receivers sitting on recv().await don't wake up until another message (i.e. more than one) hits the pipeline.

Tokio appears to "take a nap" when there are no external events or internal events above a given threshold on that local queue. Here's a minimal repro:

let (tx, mut rx) = broadcast::channel(16);
tokio::spawn(async move {
    loop {
        if let Ok(msg) = rx.recv().await {
            // Receiver basically hibernates here until more messages come
        }
    }
});
tx.send(some_data).unwrap(); // This alone won't reliably wake the receiver

After some experimentation, the only reliable solution we've found is using an interval in the local task queue:

let mut interval = tokio::time::interval(Duration::from_millis(100));
loop {
    tokio::select! {
        result = rx.recv() => {
            if let Ok(msg) = result {
                // Now we actually get our messages
            }
        }
        _ = interval.tick() => {} // Keep the runtime from dozing off
    }
}

The runtime probably needs to be more aware of local broadcast/channel operations waking receivers, especially when multiple tasks are waiting on the same event source, or this needs to be documented behavior, i.e. that a message from a sender on a local channel may not wake the awaiting receivers timely or consistently, and frankly felt like a bug on experience/was a 6 hour head scratcher because it was very unexpected behavior/we checked all of our business logic exhaustively before reaching the conclusion it was a tokio related oddity.

@Darksonn
Copy link
Contributor

Darksonn commented Feb 6, 2025

@0xAlcibiades You have a bug in your code :) It's not Tokio's fault. You should stop having loops that infinitely check the channel, get told that its closed, forever, taking up 100% CPU and blocking a runtime thread.

Using if let Ok is almost always wrong. When you get an error from a broadcast channel, the correct response is not to keep looping. You must inspect the error to understand whether it's a closed or lagged error, and respond in the appropriate way.

@0xAlcibiades
Copy link

0xAlcibiades commented Feb 6, 2025

Hey @Darksonn I could give another counterexample if it helps, there isn't some error coming off the pipeline (and we aren't actually using if let ok in the code, I didn't feel like pasting the error handling match ser), the channel isn't closed and there is a long lived sender on the broadcast broadcasting real time events every 100ms-12s in our use case. This is for a realtime-ish loop where in fact we do want to be waking for a message as soon as there is one on the channel and then reacting within say the 100s of microseconds range. The receiver in this situation very much intends to await and process messages in the hot loop/not a bug. There may indeed be a better approach though as we are just migrating from a hot loop to tokio looking at putting some of these components over gRPC (and indeed if there was a network event it seems that does wake the local thread). Regardless, it seems to be the same thing the op is seeing here and so I figured that I'd mention select w interval solving it.

let trivial_state_manager_subscriber =
            if let Some(state_manager) = self.state_manager.clone() {
                let mut rx = state_manager.subscribe();
                tokio::task::spawn(async move {
                    loop {
                        match rx.try_recv() {
                            Ok(event) => match event {
                                StateMessage::Ready(block_id, _) => {
                                    info!("SM: Dummy handle found ready event: {block_id:?}");
                                }
                                StateMessage::Indexing(block_id) => {
                                    info!("SM: Dummy handle found indexing event: {block_id:?}");
                                }
                                StateMessage::CheckpointLoaded(block_id) => {
                                    info!("SM: Dummy handle found checkpoint event: {block_id:?}");
                                }
                            },
                            Err(e) => match e {
                                TryRecvError::Lagged(lag) => {
                                    warn!("Dummy SM subscriber lagged {lag:?}");
                                }
                                TryRecvError::Closed => {
                                    warn!("Dummy SM subscriber failed because channel closed");
                                }
                                _ => {}
                            },
                        }
                    }
                })
            } else {
                tokio::task::spawn(async move {})
            };

That's the more brutal workaround...

On the other side there is a long lived broadcast, only every other or perhaps 40% of the events on the broadcast are picked up timely without the interval select or try_recv, either because of what you say, that tokio puts the hot loop to sleep/parks it, or because there isn't enough activity on a local queue (which then also puts the task to sleep), but it will go to sleep with a message on the broadcast until a 2nd event then comes in on the broadcast and then process the first and unreliably the second, i.e. the broadcast is not waking the awaiting receiver. Any sort of select with a competing future completing at a random or fixed interval keeps the loop awake and make the receipt reliable.

The expected behavior is that the loop would wait at the await point and then process the next message, which does not actually occur. The loop does indeed pause as expected at the await point (not consuming 100% of CPU, which is the whole idea, though it might indeed block that runtime thread) but it doesn't wake up timely. It's not, for example, sitting there spamming out messages.

I'd be very curious if there is a better way w tokio to effect a real time subscription to broadcast (there likely is).

@Darksonn
Copy link
Contributor

Darksonn commented Feb 7, 2025

The receiver in this situation very much intends to await and process messages in the hot loop/not a bug.

It's a bug to do a hot loop on a Tokio runtime thread, end of story. Use a dedicated thread if you must hot loop.

@0xAlcibiades
Copy link

0xAlcibiades commented Feb 8, 2025

Ok, I don't want to blow up op's issue here, but I have to imagine that there's a way to do this because I've seen examples of very similar things in many repositories using tokio such as tonic, reqwest, hyper etc, where there is stream processing in a "hot" async loop.

@Darksonn
Copy link
Contributor

Darksonn commented Feb 8, 2025

I don't know what you are referring to, but I bet those loops do in fact yield to the runtime rather than use try_* methods.

@0xAlcibiades
Copy link

I don't know what you are referring to, but I bet those loops do in fact yield to the runtime rather than use try_* methods.

Yeah, we seem to be having shots over the bow here, at the end of the day we just needed to yield to the scheduler in the loops and are good to go, thanks for the guidance!

@maminrayej maminrayej linked a pull request Feb 17, 2025 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-coop Module: tokio/coop M-sync Module: tokio/sync
Projects
None yet
3 participants