Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resubscribing to a closed broadcast receiver will hang on a call to recv #4814

Closed
nathaniel-daniel opened this issue Jul 7, 2022 · 5 comments · Fixed by #4867
Closed

Resubscribing to a closed broadcast receiver will hang on a call to recv #4814

nathaniel-daniel opened this issue Jul 7, 2022 · 5 comments · Fixed by #4867
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-sync Module: tokio/sync

Comments

@nathaniel-daniel
Copy link

nathaniel-daniel commented Jul 7, 2022

Version
tokio v1.19.2, tokio master (4daeea8)

Platform
Windows 10 64 bit

Description
Attempting to resubscribe to a closed broadcast receiver will hang on calls to recv.

I tried this code:
Cargo.toml:

[package]
name = "tokio-broadcast-bug"
version = "0.0.0"
edition = "2021"

[dependencies]
tokio = { version = "1.19.2", features = ["full"] }

main.rs:

#[tokio::main]
async fn main() {
    let (tx, rx) = tokio::sync::broadcast::channel::<u32>(4);
    drop(tx);

    let mut rx_clone = rx.resubscribe();
    drop(rx);

    loop {
        match rx_clone.recv().await {
            Ok(msg) => {
                println!("{}", msg);
            }
            Err(tokio::sync::broadcast::error::RecvError::Closed) => {
                println!("Closed");
                break;
            }
            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
                println!("Lagged by {n} messages");
            }
        }
    }

    println!("Done");
}

I expected to see this happen:
The loop should exit.

Instead, this happened:
The program hangs indefinitely.

Furthermore, replacing the loop with a call to try_recv yields an Empty error instead of a Closed error.

@nathaniel-daniel nathaniel-daniel added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Jul 7, 2022
@Darksonn Darksonn added the M-sync Module: tokio/sync label Jul 7, 2022
@Darksonn
Copy link
Contributor

Darksonn commented Jul 9, 2022

That certainly does sound like a bug.

@nathaniel-daniel
Copy link
Author

I don't really understand everything about this channel, but shouldn't new_receiver update the next field similarly to how recv_ref does so here, by including an adjust value to account for the close message? Making this change seems to fix my simple example, but I don't know if it works in a larger project or if it introduces any other bugs; I was hoping someone more knowledgeable could take a look.

@Darksonn
Copy link
Contributor

The index not being adjusted for the close message does indeed appear to be the correct answer.

hds added a commit to hds/tokio that referenced this issue Jul 26, 2022
The broadcast channel allows multiple senders to send messages to
multiple receivers, where each receiver receives messages starting from
when it subscribes. After all senders are dropped, the receivers will
continue to receive all waiting messages in the buffer and then receive
a `Closed` error.

To mark that a channel has closed, it stores two closed flags, one on
the channel level and another in the buffer slot *after* the last used
slot (this may also be the earliest entry being kept for lagged
receivers, see tokio-rs#2425).

However, we don't need both closed flags, keeping the channel level
closed flag is sufficient.

Without the slot level closed flag, each receiver receives each message
until it is up to date and for that receiver the channel is empty. Then,
the actual return message is chosen depending on the channel level
closed flag; if the channel is NOT closed, then `Empty` is returned, if
the channel is closed then `Closed` is returned instead.

With the modified logic, there is no longer a need to append a closed
token to the internal buffer (by setting the slot level closed flag on
the next slot). This fixes the off by one error described in tokio-rs#4814,
which caused a receiver which was created after the channel was already
closed to get `Empty` from `try_recv` (or hang forever when calling
`recv`) instead of receiving `Closed`.

As a bonus, we save a single bool on each buffer slot.
hds added a commit to hds/tokio that referenced this issue Jul 26, 2022
The broadcast channel allows multiple senders to send messages to
multiple receivers, where each receiver receives messages starting from
when it subscribes. After all senders are dropped, the receivers will
continue to receive all waiting messages in the buffer and then receive
a `Closed` error.

To mark that a channel has closed, it stores two closed flags, one on
the channel level and another in the buffer slot *after* the last used
slot (this may also be the earliest entry being kept for lagged
receivers, see tokio-rs#2425).

However, we don't need both closed flags, keeping the channel level
closed flag is sufficient.

Without the slot level closed flag, each receiver receives each message
until it is up to date and for that receiver the channel is empty. Then,
the actual return message is chosen depending on the channel level
closed flag; if the channel is NOT closed, then `Empty` is returned, if
the channel is closed then `Closed` is returned instead.

With the modified logic, there is no longer a need to append a closed
token to the internal buffer (by setting the slot level closed flag on
the next slot). This fixes the off by one error described in tokio-rs#4814,
which caused a receiver which was created after the channel was already
closed to get `Empty` from `try_recv` (or hang forever when calling
`recv`) instead of receiving `Closed`.

As a bonus, we save a single bool on each buffer slot.

Refs: tokio-rs#4814
hds added a commit to hds/tokio that referenced this issue Jul 26, 2022
The broadcast channel allows multiple senders to send messages to
multiple receivers, where each receiver receives messages starting from
when it subscribes. After all senders are dropped, the receivers will
continue to receive all waiting messages in the buffer and then receive
a `Closed` error.

To mark that a channel has closed, it stores two closed flags, one on
the channel level and another in the buffer slot *after* the last used
slot (this may also be the earliest entry being kept for lagged
receivers, see tokio-rs#2425).

However, we don't need both closed flags, keeping the channel level
closed flag is sufficient.

Without the slot level closed flag, each receiver receives each message
until it is up to date and for that receiver the channel is empty. Then,
the actual return message is chosen depending on the channel level
closed flag; if the channel is NOT closed, then `Empty` is returned, if
the channel is closed then `Closed` is returned instead.

With the modified logic, there is no longer a need to append a closed
token to the internal buffer (by setting the slot level closed flag on
the next slot). This fixes the off by one error described in tokio-rs#4814,
which caused a receiver which was created after the channel was already
closed to get `Empty` from `try_recv` (or hang forever when calling
`recv`) instead of receiving `Closed`.

As a bonus, we save a single bool on each buffer slot.

Refs: tokio-rs#4814
hds added a commit to hds/tokio that referenced this issue Jul 26, 2022
The broadcast channel allows multiple senders to send messages to
multiple receivers, where each receiver receives messages starting from
when it subscribes. After all senders are dropped, the receivers will
continue to receive all waiting messages in the buffer and then receive
a `Closed` error.

To mark that a channel has closed, it stores two closed flags, one on
the channel level and another in the buffer slot *after* the last used
slot (this may also be the earliest entry being kept for lagged
receivers, see tokio-rs#2425).

However, we don't need both closed flags, keeping the channel level
closed flag is sufficient.

Without the slot level closed flag, each receiver receives each message
until it is up to date and for that receiver the channel is empty. Then,
the actual return message is chosen depending on the channel level
closed flag; if the channel is NOT closed, then `Empty` is returned, if
the channel is closed then `Closed` is returned instead.

With the modified logic, there is no longer a need to append a closed
token to the internal buffer (by setting the slot level closed flag on
the next slot). This fixes the off by one error described in tokio-rs#4814,
which caused a receiver which was created after the channel was already
closed to get `Empty` from `try_recv` (or hang forever when calling
`recv`) instead of receiving `Closed`.

As a bonus, we save a single `bool` on each buffer slot.

Refs: tokio-rs#4814
@hds
Copy link
Contributor

hds commented Jul 26, 2022

I came up with a solution for this issue which is a bit more drastic that I originally anticipated (I'm removing closed flag in the slot and just relying on the channel level closed flag).

@nathaniel-daniel Since it was your observation that the next flag on the resubscribed receiver that was incorrect, perhaps you'd be interested in looking at the solution? I probably wouldn't have arrived at it without your comments!

Darksonn pushed a commit that referenced this issue Aug 10, 2022
The broadcast channel allows multiple senders to send messages to
multiple receivers, where each receiver receives messages starting from
when it subscribes. After all senders are dropped, the receivers will
continue to receive all waiting messages in the buffer and then receive
a `Closed` error.

To mark that a channel has closed, it stores two closed flags, one on
the channel level and another in the buffer slot *after* the last used
slot (this may also be the earliest entry being kept for lagged
receivers, see #2425).

However, we don't need both closed flags, keeping the channel level
closed flag is sufficient.

Without the slot level closed flag, each receiver receives each message
until it is up to date and for that receiver the channel is empty. Then,
the actual return message is chosen depending on the channel level
closed flag; if the channel is NOT closed, then `Empty` is returned, if
the channel is closed then `Closed` is returned instead.

With the modified logic, there is no longer a need to append a closed
token to the internal buffer (by setting the slot level closed flag on
the next slot). This fixes the off by one error described in #4814,
which caused a receiver which was created after the channel was already
closed to get `Empty` from `try_recv` (or hang forever when calling
`recv`) instead of receiving `Closed`.

As a bonus, we save a single `bool` on each buffer slot.

Refs: #4814
@nathaniel-daniel
Copy link
Author

Sorry for the rate reply, I've been busy. Thanks for the fix @hds! I just tested and it seems to work great on a simple example and a small sized project I'm working on. I was writing up a PR to do essentially what you did, but it stalled as I tried to figure out the ramifications of removing the double closed flags, performance or otherwise; I couldn't figure out why the two closed flags were even introduced in the first place. Thanks for the PR!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-sync Module: tokio/sync
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants