-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Make unbounded channels size warning exact (part 1) #13117
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,7 +42,7 @@ use std::{ | |
fmt, | ||
pin::Pin, | ||
sync::{ | ||
atomic::{AtomicI64, Ordering}, | ||
atomic::{AtomicUsize, Ordering}, | ||
Arc, | ||
}, | ||
task::{Context, Poll}, | ||
|
@@ -52,10 +52,10 @@ use std::{ | |
/// | ||
/// The name is used in Prometheus reports, the queue size threshold is used | ||
/// to warn if there are too many unprocessed events in the channel. | ||
pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver) { | ||
pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiver) { | ||
let (tx, rx) = mpsc::unbounded(); | ||
let metrics = Arc::new(Mutex::new(None)); | ||
let queue_size = Arc::new(AtomicI64::new(0)); | ||
let queue_size = Arc::new(AtomicUsize::new(0)); | ||
let tx = Sender { | ||
inner: tx, | ||
name, | ||
|
@@ -81,12 +81,9 @@ pub struct Sender { | |
/// Name to identify the channel (e.g., in Prometheus and logs). | ||
name: &'static str, | ||
/// Number of events in the queue. Clone of [`Receiver::in_transit`]. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand the second half of this comment. Edit: also there's a
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, there is an error in the comment. It should be |
||
// To not bother with ordering and possible underflow errors of the unsigned counter | ||
// we just use `i64` and `Ordering::Relaxed`, and perceive `queue_size` as approximate. | ||
// It can turn < 0 though. | ||
queue_size: Arc<AtomicI64>, | ||
queue_size: Arc<AtomicUsize>, | ||
/// Threshold queue size to generate an error message in the logs. | ||
queue_size_warning: i64, | ||
queue_size_warning: usize, | ||
/// We generate the error message only once to not spam the logs. | ||
warning_fired: bool, | ||
/// Backtrace of a place where the channel was created. | ||
|
@@ -115,7 +112,7 @@ impl Drop for Sender { | |
pub struct Receiver { | ||
inner: mpsc::UnboundedReceiver<Event>, | ||
name: &'static str, | ||
queue_size: Arc<AtomicI64>, | ||
queue_size: Arc<AtomicUsize>, | ||
/// Initially contains `None`, and will be set to a value once the corresponding [`Sender`] | ||
/// is assigned to an instance of [`OutChannels`]. | ||
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>, | ||
|
@@ -126,7 +123,14 @@ impl Stream for Receiver { | |
|
||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Event>> { | ||
if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) { | ||
let _ = self.queue_size.fetch_sub(1, Ordering::Relaxed); | ||
// `Ordering::Release` guarantees here that no loads (`poll_next()`) can be reordered | ||
// after the `sub` operation. Assuming causality between `unbounded_send()` and | ||
// `poll_next()` this ensures that counter decrement can't happen before the | ||
// corresponding counter load in `send()`. | ||
// Note that this is not the usual Release-Acquire ordering based on synchronization of | ||
// store and load of an atomic variable, but more general application of Acquire and | ||
// Release ordering properties. | ||
let _ = self.queue_size.fetch_sub(1, Ordering::Release); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd just use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You usually can't go wrong with However, I don't understand the larger issue here. Reading the PR and the linked comments just made me confused. @dmitry-markin can you please summarize what the issue is we are trying to solve, and what memory ordering has to do with it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Originally we added the queue size monitoring to unbounded channels in order to detect message "leaks" — when some of the receivers are not polled, and the messages keep stacking in the channel, "leaking" memory (we observed such bugs in the past). In the original implementation of this queue threshold warning I didn't bother at all about precise queue size estimation and memory order implications, considering that +/-1 error in queue size estimation is not a big deal when we talk about 100_000 or even 100 messages warning threshold. It turned out, though, that at least some substrate users like @nazar-pc would like to set the exact warning threshold, specifically when dealing with channels with explicit acknowledgement (in this case the queue size threshold must be exactly 1), so I created this follow-up PR that makes the queue size measurement exact. Nevertheless, doing things right with atomics seems not that trivial, especially when we go out of typical C++/Rust memory orderings territory and start to reason on a level of It's worth noting that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Honestly unless someone can point out some downsides I'd probably prefer if we'd just switch to a channel that supports checking the length natively instead of jerry rigging it ourselves with atomics and potentially depend on implicit implementation details of the current channel we're using. e.g. the channel from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just let's do what @koute is saying and continue living our happy lives without any synchronization problems :D |
||
let metrics = self.metrics.lock().clone(); | ||
match metrics.as_ref().map(|m| m.as_ref()) { | ||
Some(Some(metrics)) => metrics.event_out(&ev, self.name), | ||
|
@@ -191,7 +195,9 @@ impl OutChannels { | |
/// Sends an event. | ||
pub fn send(&mut self, event: Event) { | ||
self.event_streams.retain_mut(|sender| { | ||
let queue_size = sender.queue_size.fetch_add(1, Ordering::Relaxed); | ||
// `Ordering::Acquire` guarantees here that no stores (`unbounded_send()`) can be | ||
// reordered before the `fetch` load operation. | ||
let queue_size = sender.queue_size.fetch_add(1, Ordering::Acquire); | ||
if queue_size == sender.queue_size_warning && !sender.warning_fired { | ||
Comment on lines
+200
to
201
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to the docs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it just means |
||
sender.warning_fired = true; | ||
sender.creation_backtrace.resolve(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this go one step further and use
NonZeroUsize
as0
doesn't really make much sense as a value here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would require writing
NonZeroUsize::new(100_000).unwrap()
instead of100_000
every time when passing the queue size threshold, so I'm not sure that it makes sense.