Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync::broadcast returns Lagged error when capacity is not exceeded #2425

Closed
iffyio opened this issue Apr 21, 2020 · 2 comments · Fixed by #2448
Closed

sync::broadcast returns Lagged error when capacity is not exceeded #2425

iffyio opened this issue Apr 21, 2020 · 2 comments · Fixed by #2448
Assignees
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-sync Module: tokio/sync

Comments

@iffyio
Copy link

iffyio commented Apr 21, 2020

Version

rustc 1.41.0 (5e1a79984 2020-01-27)
tokio: 0.2.18

Platform

Linux 5.3.0-45-generic #37~18.04.1-Ubuntu

Description

Hi! I ran into an issue on the sync::broadcast::channel where buffering events up to the channel's capacity triggers a RecvError::Lagged error when the channel is finally read from. I expected this error to only be triggered if the channel's capacity is exceeded.

As an example the following code errors out with Err(Lagged(1)).

#[tokio::main]
async fn main() {
    let (tx, mut rx) = tokio::sync::broadcast::channel(1);
    let _1 = tokio::spawn(async move {
        tx.send(0).unwrap();
    });
    let _2 = tokio::spawn(async move {
        rx.recv().await.unwrap(); // panic
    });
    _1.await.unwrap();
    _2.await.unwrap();
}

It seems to be a threading issue? compared to the following that works as expected.

#[tokio::main]
async fn main() {
    let (tx, mut rx) = tokio::sync::broadcast::channel(1);
    tx.send(0).unwrap();
    let _2 = tokio::spawn(async move {
        rx.recv().await.unwrap(); // OK
    });
    _2.await.unwrap();
}

@Darksonn Darksonn added A-tokio Area: The main tokio crate C-question User questions that are neither feature requests nor bug reports M-sync Module: tokio/sync C-bug Category: This is a bug. and removed C-question User questions that are neither feature requests nor bug reports labels Apr 21, 2020
@Darksonn Darksonn self-assigned this Apr 21, 2020
@Darksonn
Copy link
Contributor

This appears to be a bug. I will take a look at it when I have time.

@Darksonn
Copy link
Contributor

This is because the stream sends a None message when it has been closed. It would be nice to not count that None message as part of the message limit, though.

carllerche pushed a commit that referenced this issue Apr 28, 2020
Broadcast uses a ring buffer to store values sent to the channel. In order to
deal with slow receivers, the oldest values are overwritten with new values
once the buffer wraps. A receiver should be able to calculate how many values
it has missed.

Additionally, when the broadcast closes, a final value of `None` is sent to
the channel. If the buffer has wrapped, this value overwrites the oldest
value.

This is an issue mainly in a single capacity broadcast when a value is sent
and then the sender is dropped. The original value is immediately overwritten
with `None` meaning that receivers assume they have lagged behind.

**Solution**

A value of `None` is no longer sent to the channel when the final sender has
been dropped. This solves the single capacity broadcast case by completely
removing the behavior of overwriting values when the channel is closed.

Now, when the final sender is dropped a closed bit is set on the next slot
that the channel is supposed to send to.

In the case of a fast receiver, if it finds a slot where the closed bit is
set, it knows the channel is closed without locking the tail.

In the case of a slow receiver, it must first find out if it has missed any
values. This is similar to before, but must be able to account for channel
closure.

If the channel is not closed, the oldest value may be located at index `n`. If
the channel is closed, the oldest value is located at index `n - 1`.

Knowing the index where the oldest value is located, a receiver can calculate
how many values it may have missed and starts to catch up.

Closes #2425
hds added a commit to hds/tokio that referenced this issue Jul 26, 2022
The broadcast channel allows multiple senders to send messages to
multiple receivers, where each receiver receives messages starting from
when it subscribes. After all senders are dropped, the receivers will
continue to receive all waiting messages in the buffer and then receive
a `Closed` error.

To mark that a channel has closed, it stores two closed flags, one on
the channel level and another in the buffer slot *after* the last used
slot (this may also be the earliest entry being kept for lagged
receivers, see tokio-rs#2425).

However, we don't need both closed flags, keeping the channel level
closed flag is sufficient.

Without the slot level closed flag, each receiver receives each message
until it is up to date and for that receiver the channel is empty. Then,
the actual return message is chosen depending on the channel level
closed flag; if the channel is NOT closed, then `Empty` is returned, if
the channel is closed then `Closed` is returned instead.

With the modified logic, there is no longer a need to append a closed
token to the internal buffer (by setting the slot level closed flag on
the next slot). This fixes the off by one error described in tokio-rs#4814,
which caused a receiver which was created after the channel was already
closed to get `Empty` from `try_recv` (or hang forever when calling
`recv`) instead of receiving `Closed`.

As a bonus, we save a single bool on each buffer slot.
hds added a commit to hds/tokio that referenced this issue Jul 26, 2022
The broadcast channel allows multiple senders to send messages to
multiple receivers, where each receiver receives messages starting from
when it subscribes. After all senders are dropped, the receivers will
continue to receive all waiting messages in the buffer and then receive
a `Closed` error.

To mark that a channel has closed, it stores two closed flags, one on
the channel level and another in the buffer slot *after* the last used
slot (this may also be the earliest entry being kept for lagged
receivers, see tokio-rs#2425).

However, we don't need both closed flags, keeping the channel level
closed flag is sufficient.

Without the slot level closed flag, each receiver receives each message
until it is up to date and for that receiver the channel is empty. Then,
the actual return message is chosen depending on the channel level
closed flag; if the channel is NOT closed, then `Empty` is returned, if
the channel is closed then `Closed` is returned instead.

With the modified logic, there is no longer a need to append a closed
token to the internal buffer (by setting the slot level closed flag on
the next slot). This fixes the off by one error described in tokio-rs#4814,
which caused a receiver which was created after the channel was already
closed to get `Empty` from `try_recv` (or hang forever when calling
`recv`) instead of receiving `Closed`.

As a bonus, we save a single bool on each buffer slot.

Refs: tokio-rs#4814
hds added a commit to hds/tokio that referenced this issue Jul 26, 2022
The broadcast channel allows multiple senders to send messages to
multiple receivers, where each receiver receives messages starting from
when it subscribes. After all senders are dropped, the receivers will
continue to receive all waiting messages in the buffer and then receive
a `Closed` error.

To mark that a channel has closed, it stores two closed flags, one on
the channel level and another in the buffer slot *after* the last used
slot (this may also be the earliest entry being kept for lagged
receivers, see tokio-rs#2425).

However, we don't need both closed flags, keeping the channel level
closed flag is sufficient.

Without the slot level closed flag, each receiver receives each message
until it is up to date and for that receiver the channel is empty. Then,
the actual return message is chosen depending on the channel level
closed flag; if the channel is NOT closed, then `Empty` is returned, if
the channel is closed then `Closed` is returned instead.

With the modified logic, there is no longer a need to append a closed
token to the internal buffer (by setting the slot level closed flag on
the next slot). This fixes the off by one error described in tokio-rs#4814,
which caused a receiver which was created after the channel was already
closed to get `Empty` from `try_recv` (or hang forever when calling
`recv`) instead of receiving `Closed`.

As a bonus, we save a single bool on each buffer slot.

Refs: tokio-rs#4814
hds added a commit to hds/tokio that referenced this issue Jul 26, 2022
The broadcast channel allows multiple senders to send messages to
multiple receivers, where each receiver receives messages starting from
when it subscribes. After all senders are dropped, the receivers will
continue to receive all waiting messages in the buffer and then receive
a `Closed` error.

To mark that a channel has closed, it stores two closed flags, one on
the channel level and another in the buffer slot *after* the last used
slot (this may also be the earliest entry being kept for lagged
receivers, see tokio-rs#2425).

However, we don't need both closed flags, keeping the channel level
closed flag is sufficient.

Without the slot level closed flag, each receiver receives each message
until it is up to date and for that receiver the channel is empty. Then,
the actual return message is chosen depending on the channel level
closed flag; if the channel is NOT closed, then `Empty` is returned, if
the channel is closed then `Closed` is returned instead.

With the modified logic, there is no longer a need to append a closed
token to the internal buffer (by setting the slot level closed flag on
the next slot). This fixes the off by one error described in tokio-rs#4814,
which caused a receiver which was created after the channel was already
closed to get `Empty` from `try_recv` (or hang forever when calling
`recv`) instead of receiving `Closed`.

As a bonus, we save a single `bool` on each buffer slot.

Refs: tokio-rs#4814
Darksonn pushed a commit that referenced this issue Aug 10, 2022
The broadcast channel allows multiple senders to send messages to
multiple receivers, where each receiver receives messages starting from
when it subscribes. After all senders are dropped, the receivers will
continue to receive all waiting messages in the buffer and then receive
a `Closed` error.

To mark that a channel has closed, it stores two closed flags, one on
the channel level and another in the buffer slot *after* the last used
slot (this may also be the earliest entry being kept for lagged
receivers, see #2425).

However, we don't need both closed flags, keeping the channel level
closed flag is sufficient.

Without the slot level closed flag, each receiver receives each message
until it is up to date and for that receiver the channel is empty. Then,
the actual return message is chosen depending on the channel level
closed flag; if the channel is NOT closed, then `Empty` is returned, if
the channel is closed then `Closed` is returned instead.

With the modified logic, there is no longer a need to append a closed
token to the internal buffer (by setting the slot level closed flag on
the next slot). This fixes the off by one error described in #4814,
which caused a receiver which was created after the channel was already
closed to get `Empty` from `try_recv` (or hang forever when calling
`recv`) instead of receiving `Closed`.

As a bonus, we save a single `bool` on each buffer slot.

Refs: #4814
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-sync Module: tokio/sync
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants