-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Add resubscribe functionality to broadcast::Receiver #4607
Conversation
Why not just implement |
Though some kind of resubscribe method that produces an independent, fresh receiver as if we have subscribed at this moment at the sender is still a valid use case. Not sure how to name such a method appropriately? |
I would consider a |
|
there go |
Can you add some tests for this? |
@Darksonn wdyt? |
@Darksonn in case you missed the last mention |
You are not currently updating this counter: tokio/tokio/src/sync/broadcast.rs Lines 326 to 334 in 06e413c
Currently that counter can never increase. It's unclear to me whether simply adding to it can result in some kind of race where the value is lost due to a concurrent call to |
@Darksonn thanks for the review, I guess I didn't realize what I was getting myself into 🤷♂️. I agree with @uklotzde that we should probably also have a I've added some tests and below I've laid out my reasoning why incrementing At first I came to the same conclusion as you, that incrementing We are concerned with a data-race on
Consider their interactions: 1 and 3 are pre-existing so I am going to ignore how they may affect each other. We obtain a Now, can |
working on fixing the loom test |
@Darksonn Okay so I think I've fixed my loom test, glad I wrote that one haha. I was wondering, do you think I should write a compile_fail test that verifies my assertion that |
That Unfortunately, I probably wont have time to review this in detail until next week, but I think I have been convinced that elements that the new receiver needs wont be dropped since the existing receiver also needs to see them, and can't consume them while the new receiver is being created. |
@Darksonn bump |
@Darksonn fixed that |
My hesitation w/ this PR and the reason I did not add Because of this, if the clone behavior is required, I suggest providing it as an explicit method that can be documented as having the |
I see. I didn't consider that. Going back to a |
@carllerche @Darksonn It seems like both functionalities would be useful. But I also did not consider that the |
Alright, I've just added both methods here for discussion. When you all decide if you'd like one or both or in separate PRs just let me know and I'll be more than happy to update the PR. Additionally please feel free to just copy what I have and adjust as necessary. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My current thought is to just provide a resubscribe
method. We may be able to fix it so we can have a cheap Clone
in the future, in which case we would want a Clone
impl instead.
tokio/src/sync/broadcast.rs
Outdated
@@ -881,6 +882,44 @@ impl<T> Receiver<T> { | |||
} | |||
|
|||
impl<T: Clone> Receiver<T> { | |||
/// Re-subscribes to the channel starting from the current tail element (the last element passed to `Sender::send`.) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation here would need to be improved.
tokio/src/sync/broadcast.rs
Outdated
/// let mut rx2 = rx.resubscribe(); | ||
/// tx.send(2).unwrap(); | ||
/// | ||
/// assert_neq!(rx.recv().await.unwrap(), rx2.recv().await.unwrap()); // 1 != 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this clearer?
/// assert_neq!(rx.recv().await.unwrap(), rx2.recv().await.unwrap()); // 1 != 2 | |
/// assert_eq!(rx2.recv().await.unwrap(), 2); | |
/// assert_eq!(rx.recv().await.unwrap(), 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ya agreed
@Darksonn those bsd failures seem unrelated, any idea what's going on there? |
You need to merge master to get the fixes. It's because of the 1.61 release of Rust. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
This PR contains the following updates: | Package | Type | Update | Change | |---|---|---|---| | [tokio](https://tokio.rs) ([source](https://github.com/tokio-rs/tokio)) | dependencies | minor | `1.18.2` -> `1.19.1` | | [tokio](https://tokio.rs) ([source](https://github.com/tokio-rs/tokio)) | dev-dependencies | minor | `1.18.2` -> `1.19.1` | --- ### Release Notes <details> <summary>tokio-rs/tokio</summary> ### [`v1.19.1`](https://github.com/tokio-rs/tokio/releases/tag/tokio-1.19.1) [Compare Source](tokio-rs/tokio@tokio-1.19.0...tokio-1.19.1) ##### 1.19.1 (June 5, 2022) This release fixes a bug in `Notified::enable`. ([#​4747]) [#​4747]: tokio-rs/tokio#4747 ### [`v1.19.0`](https://github.com/tokio-rs/tokio/releases/tag/tokio-1.19.0) [Compare Source](tokio-rs/tokio@tokio-1.18.2...tokio-1.19.0) ##### 1.19.0 (June 3, 2022) ##### Added - runtime: add `is_finished` method for `JoinHandle` and `AbortHandle` ([#​4709]) - runtime: make global queue and event polling intervals configurable ([#​4671]) - sync: add `Notified::enable` ([#​4705]) - sync: add `watch::Sender::send_if_modified` ([#​4591]) - sync: add resubscribe method to broadcast::Receiver ([#​4607]) - net: add `take_error` to `TcpSocket` and `TcpStream` ([#​4739]) ##### Changed - io: refactor out usage of Weak in the io handle ([#​4656]) ##### Fixed - macros: avoid starvation in `join!` and `try_join!` ([#​4624]) ##### Documented - runtime: clarify semantics of tasks outliving `block_on` ([#​4729]) - time: fix example for `MissedTickBehavior::Burst` ([#​4713]) ##### Unstable - metrics: correctly update atomics in `IoDriverMetrics` ([#​4725]) - metrics: fix compilation with unstable, process, and rt, but without net ([#​4682]) - task: add `#[track_caller]` to `JoinSet`/`JoinMap` ([#​4697]) - task: add `Builder::{spawn_on, spawn_local_on, spawn_blocking_on}` ([#​4683]) - task: add `consume_budget` for cooperative scheduling ([#​4498]) - task: add `join_set::Builder` for configuring `JoinSet` tasks ([#​4687]) - task: update return value of `JoinSet::join_one` ([#​4726]) [#​4498]: tokio-rs/tokio#4498 [#​4591]: tokio-rs/tokio#4591 [#​4607]: tokio-rs/tokio#4607 [#​4624]: tokio-rs/tokio#4624 [#​4656]: tokio-rs/tokio#4656 [#​4671]: tokio-rs/tokio#4671 [#​4682]: tokio-rs/tokio#4682 [#​4683]: tokio-rs/tokio#4683 [#​4687]: tokio-rs/tokio#4687 [#​4697]: tokio-rs/tokio#4697 [#​4705]: tokio-rs/tokio#4705 [#​4709]: tokio-rs/tokio#4709 [#​4713]: tokio-rs/tokio#4713 [#​4725]: tokio-rs/tokio#4725 [#​4726]: tokio-rs/tokio#4726 [#​4729]: tokio-rs/tokio#4729 [#​4739]: tokio-rs/tokio#4739 </details> --- ### Configuration 📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about these updates again. --- - [x] <!-- rebase-check -->If you want to rebase/retry this PR, click this checkbox. --- This PR has been generated by [Renovate Bot](https://github.com/renovatebot/renovate). Co-authored-by: cabr2-bot <cabr2.help@gmail.com> Reviewed-on: https://codeberg.org/Calciumdibromid/CaBr2/pulls/1394 Reviewed-by: crapStone <crapstone@noreply.codeberg.org> Co-authored-by: Calciumdibromid Bot <cabr2_bot@noreply.codeberg.org> Co-committed-by: Calciumdibromid Bot <cabr2_bot@noreply.codeberg.org>
@carllerche sorry for commenting on an old thread, but would you be able to explain how would pub fn resubscribe(&self) -> Self {
let shared = self.shared.clone();
new_receiver(shared)
} /// Create a new `Receiver` which reads starting from the tail.
fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
let mut tail = shared.tail.lock();
if tail.rx_cnt == MAX_RECEIVERS {
panic!("max receivers");
}
tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
let next = tail.pos;
drop(tail);
Receiver { shared, next }
} and I think it would be just a matter of setting |
Each item contains a counter for the number of receivers that need to see it. You would have to increment all of those. |
Ah I see, thanks for explaining! |
Add subscribe method to broadcast::Receiver with the same functionality as the clone impl removed in #3020.