From 902f38d6d858d52b9fabf6cb1bf21118a40cec2d Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 10 Jan 2023 17:37:35 +0300 Subject: [PATCH] Make unbounded channel size warning exact in `out_events` --- client/network/src/service/out_events.rs | 28 ++++++++++++++---------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index 5e0c8ac6a1a40..1b6febd376bb4 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -42,7 +42,7 @@ use std::{ fmt, pin::Pin, sync::{ - atomic::{AtomicI64, Ordering}, + atomic::{AtomicUsize, Ordering}, Arc, }, task::{Context, Poll}, @@ -52,10 +52,10 @@ use std::{ /// /// The name is used in Prometheus reports, the queue size threshold is used /// to warn if there are too many unprocessed events in the channel. -pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver) { +pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiver) { let (tx, rx) = mpsc::unbounded(); let metrics = Arc::new(Mutex::new(None)); - let queue_size = Arc::new(AtomicI64::new(0)); + let queue_size = Arc::new(AtomicUsize::new(0)); let tx = Sender { inner: tx, name, @@ -81,12 +81,9 @@ pub struct Sender { /// Name to identify the channel (e.g., in Prometheus and logs). name: &'static str, /// Number of events in the queue. Clone of [`Receiver::in_transit`]. - // To not bother with ordering and possible underflow errors of the unsigned counter - // we just use `i64` and `Ordering::Relaxed`, and perceive `queue_size` as approximate. - // It can turn < 0 though. - queue_size: Arc, + queue_size: Arc, /// Threshold queue size to generate an error message in the logs. - queue_size_warning: i64, + queue_size_warning: usize, /// We generate the error message only once to not spam the logs. warning_fired: bool, /// Backtrace of a place where the channel was created. @@ -115,7 +112,7 @@ impl Drop for Sender { pub struct Receiver { inner: mpsc::UnboundedReceiver, name: &'static str, - queue_size: Arc, + queue_size: Arc, /// Initially contains `None`, and will be set to a value once the corresponding [`Sender`] /// is assigned to an instance of [`OutChannels`]. metrics: Arc>>>>, @@ -126,7 +123,14 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) { - let _ = self.queue_size.fetch_sub(1, Ordering::Relaxed); + // `Ordering::Release` guarantees here that no loads (`poll_next()`) can be reordered + // after the `sub` operation. Assuming causality between `unbounded_send()` and + // `poll_next()` this ensures that counter decrement can't happen before the + // corresponding counter load in `send()`. + // Note that this is not the usual Release-Acquire ordering based on synchronization of + // store and load of an atomic variable, but more general application of Acquire and + // Release ordering properties. + let _ = self.queue_size.fetch_sub(1, Ordering::Release); let metrics = self.metrics.lock().clone(); match metrics.as_ref().map(|m| m.as_ref()) { Some(Some(metrics)) => metrics.event_out(&ev, self.name), @@ -191,7 +195,9 @@ impl OutChannels { /// Sends an event. pub fn send(&mut self, event: Event) { self.event_streams.retain_mut(|sender| { - let queue_size = sender.queue_size.fetch_add(1, Ordering::Relaxed); + // `Ordering::Acquire` guarantees here that no stores (`unbounded_send()`) can be + // reordered before the `fetch` load operation. + let queue_size = sender.queue_size.fetch_add(1, Ordering::Acquire); if queue_size == sender.queue_size_warning && !sender.warning_fired { sender.warning_fired = true; sender.creation_backtrace.resolve();