From 914a196fbc864dc600763f75b4977b1022ca7089 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 9 Oct 2024 17:54:07 +0900 Subject: [PATCH] Adding an actor inbox gauge. (#5481) The goal here is to offer a way to spot a possible leak (diverging number of actors). --- quickwit/quickwit-actors/src/mailbox.rs | 50 +++++++++++++++++++------ 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/quickwit/quickwit-actors/src/mailbox.rs b/quickwit/quickwit-actors/src/mailbox.rs index 90f95e7b8b3..711136401a5 100644 --- a/quickwit/quickwit-actors/src/mailbox.rs +++ b/quickwit/quickwit-actors/src/mailbox.rs @@ -21,10 +21,10 @@ use std::any::Any; use std::convert::Infallible; use std::fmt; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Weak}; +use std::sync::{Arc, OnceLock, Weak}; use std::time::Instant; -use quickwit_common::metrics::IntCounter; +use quickwit_common::metrics::{GaugeGuard, IntCounter, IntGauge}; use tokio::sync::oneshot; use crate::channel_with_priority::{Receiver, Sender, TrySendError}; @@ -311,39 +311,44 @@ impl Mailbox { } } +struct InboxInner { + rx: Receiver>, + _inboxes_count_gauge_guard: GaugeGuard<'static>, +} + pub struct Inbox { - rx: Arc>>, + inner: Arc>, } impl Clone for Inbox { fn clone(&self) -> Self { Inbox { - rx: self.rx.clone(), + inner: self.inner.clone(), } } } impl Inbox { pub(crate) fn is_empty(&self) -> bool { - self.rx.is_empty() + self.inner.rx.is_empty() } pub(crate) async fn recv(&self) -> Result, RecvError> { - self.rx.recv().await + self.inner.rx.recv().await } pub(crate) async fn recv_cmd_and_scheduled_msg_only(&self) -> Envelope { - self.rx.recv_high_priority().await + self.inner.rx.recv_high_priority().await } pub(crate) fn try_recv(&self) -> Result, RecvError> { - self.rx.try_recv() + self.inner.rx.try_recv() } #[cfg(any(test, feature = "testsuite"))] pub async fn recv_typed_message(&self) -> Result { loop { - match self.rx.recv().await { + match self.inner.rx.recv().await { Ok(mut envelope) => { if let Some(msg) = envelope.message_typed() { return Ok(msg); @@ -362,7 +367,8 @@ impl Inbox { /// Warning this iterator might never be exhausted if there is a living /// mailbox associated to it. pub fn drain_for_test(&self) -> Vec> { - self.rx + self.inner + .rx .drain_low_priority() .into_iter() .map(|mut envelope| envelope.message()) @@ -375,7 +381,8 @@ impl Inbox { /// Warning this iterator might never be exhausted if there is a living /// mailbox associated to it. pub fn drain_for_test_typed(&self) -> Vec { - self.rx + self.inner + .rx .drain_low_priority() .into_iter() .flat_map(|mut envelope| envelope.message_typed()) @@ -383,6 +390,19 @@ impl Inbox { } } +fn get_actor_inboxes_count_gauge_guard() -> GaugeGuard<'static> { + static INBOX_GAUGE: std::sync::OnceLock = OnceLock::new(); + let gauge = INBOX_GAUGE.get_or_init(|| { + quickwit_common::metrics::new_gauge( + "inboxes_count", + "overall count of actors", + "actor", + &[], + ) + }); + GaugeGuard::from_gauge(gauge) +} + pub(crate) fn create_mailbox( actor_name: String, queue_capacity: QueueCapacity, @@ -398,7 +418,13 @@ pub(crate) fn create_mailbox( }), ref_count, }; - let inbox = Inbox { rx: Arc::new(rx) }; + let inner = InboxInner { + rx, + _inboxes_count_gauge_guard: get_actor_inboxes_count_gauge_guard(), + }; + let inbox = Inbox { + inner: Arc::new(inner), + }; (mailbox, inbox) }