From be452a3c9bcc5ed4ac27f163eac6dc74cc67fd84 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Mon, 20 Nov 2023 09:11:10 -0800 Subject: [PATCH] refactor(571): Observe queue length on semaphore blocking and acquisition Fixes #571. --- crates/core/src/host/module_host.rs | 40 +++---- crates/core/src/util/lending_pool.rs | 124 +++++++--------------- crates/core/src/util/prometheus_handle.rs | 2 +- 3 files changed, 53 insertions(+), 113 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 180fabbe24..8879d22f6f 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -12,10 +12,8 @@ use crate::identity::Identity; use crate::json::client_api::{SubscriptionUpdateJson, TableRowOperationJson, TableUpdateJson}; use crate::protobuf::client_api::{table_row_operation, SubscriptionUpdate, TableRowOperation, TableUpdate}; use crate::subscription::module_subscription_actor::ModuleSubscriptionManager; -use crate::util::lending_pool::{Closed, LendingPool, LentResource, PoolClosed, WaiterGauge}; +use crate::util::lending_pool::{Closed, LendingPool, LentResource, PoolClosed}; use crate::util::notify_once::NotifyOnce; -use crate::util::prometheus_handle::{GaugeInc, IntGaugeExt}; -use crate::worker_metrics::WORKER_METRICS; use base64::{engine::general_purpose::STANDARD as BASE_64_STD, Engine as _}; use futures::{Future, FutureExt}; use indexmap::IndexMap; @@ -336,7 +334,7 @@ impl fmt::Debug for ModuleHost { trait DynModuleHost: Send + Sync + 'static { async fn get_instance( &self, - waiter_gauge_context: ::Context<'_>, + ctx: (Identity, Hash, Address, &str), ) -> Result<(&HostThreadpool, Box), NoSuchModule>; fn inject_logs(&self, log_level: LogLevel, message: &str); fn one_off_query( @@ -353,7 +351,7 @@ trait DynModuleHost: Send + Sync + 'static { struct HostControllerActor { module: Arc, threadpool: Arc, - instance_pool: LendingPool, + instance_pool: LendingPool, start: NotifyOnce, } @@ -381,38 +379,24 @@ async fn select_first>(fut_a: A, fut_b: B) -> } } -#[derive(Clone)] -struct InstancePoolGauge; - -impl WaiterGauge for InstancePoolGauge { - type Context<'a> = &'a (&'a Identity, &'a Hash, &'a Address, &'a str); - type IncGuard = GaugeInc; - fn inc(&self, &(identity, module_hash, database_address, reducer_symbol): Self::Context<'_>) -> Self::IncGuard { - WORKER_METRICS - .instance_queue_length - .with_label_values(identity, module_hash, database_address, reducer_symbol) - .inc_scope() - } -} - #[async_trait::async_trait] impl DynModuleHost for HostControllerActor { async fn get_instance( &self, - waiter_gauge_context: ::Context<'_>, + ctx: (Identity, Hash, Address, &str), ) -> Result<(&HostThreadpool, Box), NoSuchModule> { self.start.notified().await; // in the future we should do something like in the else branch here -- add more instances based on load. // we need to do write-skew retries first - right now there's only ever once instance per module. let inst = if true { self.instance_pool - .request_with_context(waiter_gauge_context) + .request_with_context(ctx) .await .map_err(|_| NoSuchModule)? } else { const GET_INSTANCE_TIMEOUT: Duration = Duration::from_millis(500); select_first( - self.instance_pool.request_with_context(waiter_gauge_context), + self.instance_pool.request_with_context(ctx), tokio::time::sleep(GET_INSTANCE_TIMEOUT).map(|()| self.spinup_new_instance()), ) .await @@ -500,7 +484,7 @@ pub enum InitDatabaseError { impl ModuleHost { pub fn new(threadpool: Arc, mut module: impl Module) -> Self { let info = module.info(); - let instance_pool = LendingPool::with_gauge(InstancePoolGauge); + let instance_pool = LendingPool::new(); instance_pool.add_multiple(module.initial_instances()).unwrap(); let inner = Arc::new(HostControllerActor { module: Arc::new(module), @@ -530,13 +514,13 @@ impl ModuleHost { F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static, R: Send + 'static, { - let waiter_gauge_context = ( - &self.info.identity, - &self.info.module_hash, - &self.info.address, + let context = ( + self.info.identity, + self.info.module_hash, + self.info.address, reducer_name, ); - let (threadpool, mut inst) = self.inner.get_instance(&waiter_gauge_context).await?; + let (threadpool, mut inst) = self.inner.get_instance(context).await?; let (tx, rx) = oneshot::channel(); threadpool.spawn(move || { diff --git a/crates/core/src/util/lending_pool.rs b/crates/core/src/util/lending_pool.rs index e1b7d865cb..26cef85934 100644 --- a/crates/core/src/util/lending_pool.rs +++ b/crates/core/src/util/lending_pool.rs @@ -9,31 +9,28 @@ use std::sync::Arc; use std::task::{Context, Poll}; use parking_lot::Mutex; -use prometheus::IntGauge; +use spacetimedb_lib::{Address, Hash, Identity}; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; -use crate::util::prometheus_handle::IntGaugeExt; +use crate::worker_metrics::WORKER_METRICS; use super::notify_once::{NotifiedOnce, NotifyOnce}; -use super::prometheus_handle::GaugeInc; -pub struct LendingPool { +pub struct LendingPool { sem: Arc, - waiter_gauge: G, inner: Arc>, } -impl Default for LendingPool { +impl Default for LendingPool { fn default() -> Self { Self::new() } } -impl Clone for LendingPool { +impl Clone for LendingPool { fn clone(&self) -> Self { Self { sem: self.sem.clone(), - waiter_gauge: self.waiter_gauge.clone(), inner: self.inner.clone(), } } @@ -52,50 +49,38 @@ struct PoolVec { #[derive(Debug)] pub struct PoolClosed; -impl LendingPool { - pub fn new() -> Self - where - G: Default, - { - Self::with_gauge(G::default()) - } - - pub fn with_gauge(waiter_gauge: G) -> Self { - Self::from_iter_with_gauge(std::iter::empty(), waiter_gauge) - } - - pub fn from_iter_with_gauge>(iter: I, waiter_gauge: G) -> Self { - let deque = VecDeque::from_iter(iter); - Self { - sem: Arc::new(Semaphore::new(deque.len())), - waiter_gauge, - inner: Arc::new(LendingPoolInner { - closed_notify: NotifyOnce::new(), - vec: Mutex::new(PoolVec { - total_count: deque.len(), - deque: Some(deque), - }), - }), - } - } - - pub fn request(&self) -> impl Future, PoolClosed>> - where - G: for<'a> WaiterGauge = ()>, - { - self.request_with_context(()) +impl LendingPool { + pub fn new() -> Self { + Self::from_iter(std::iter::empty()) } pub fn request_with_context( &self, - context: G::Context<'_>, + (identity, module_hash, database_address, reducer_symbol): (Identity, Hash, Address, &str), ) -> impl Future, PoolClosed>> { let acq = self.sem.clone().acquire_owned(); let pool_inner = self.inner.clone(); - let waiter_guard = self.waiter_gauge.inc(context); + + let queue_len = WORKER_METRICS.instance_queue_length.with_label_values( + &identity, + &module_hash, + &database_address, + reducer_symbol, + ); + let queue_len_histogram = WORKER_METRICS.instance_queue_length_histogram.with_label_values( + &identity, + &module_hash, + &database_address, + reducer_symbol, + ); + + queue_len.inc(); + queue_len_histogram.observe(queue_len.get() as f64); + async move { let permit = acq.await.map_err(|_| PoolClosed)?; - drop(waiter_guard); + queue_len.dec(); + queue_len_histogram.observe(queue_len.get() as f64); let resource = pool_inner .vec .lock() @@ -154,9 +139,19 @@ impl LendingPool { } } -impl FromIterator for LendingPool { +impl FromIterator for LendingPool { fn from_iter>(iter: I) -> Self { - Self::from_iter_with_gauge(iter, G::default()) + let deque = VecDeque::from_iter(iter); + Self { + sem: Arc::new(Semaphore::new(deque.len())), + inner: Arc::new(LendingPoolInner { + closed_notify: NotifyOnce::new(), + vec: Mutex::new(PoolVec { + total_count: deque.len(), + deque: Some(deque), + }), + }), + } } } @@ -194,17 +189,6 @@ impl DerefMut for LentResource { } } -// impl LentResource { -// fn keep(this: Self) -> T { -// let mut this = ManuallyDrop::new(this); -// let resource = unsafe { ManuallyDrop::take(&mut this.resource) }; -// let permit = unsafe { ManuallyDrop::take(&mut this.permit) }; -// permit.forget(); -// let prev_count = this.pool.total_count.fetch_sub(1, SeqCst); -// resource -// } -// } - impl Drop for LentResource { fn drop(&mut self) { let resource = unsafe { ManuallyDrop::take(&mut self.resource) }; @@ -225,31 +209,3 @@ impl Drop for LentResource { } } } - -pub trait WaiterGauge { - type Context<'a>; - type IncGuard; - fn inc(&self, context: Self::Context<'_>) -> Self::IncGuard; -} - -impl WaiterGauge for () { - type Context<'a> = (); - type IncGuard = (); - fn inc(&self, (): ()) -> Self::IncGuard {} -} - -impl WaiterGauge for Arc { - type Context<'a> = G::Context<'a>; - type IncGuard = G::IncGuard; - fn inc(&self, context: Self::Context<'_>) -> Self::IncGuard { - (**self).inc(context) - } -} - -impl WaiterGauge for IntGauge { - type Context<'a> = (); - type IncGuard = GaugeInc; - fn inc(&self, (): ()) -> Self::IncGuard { - self.inc_scope() - } -} diff --git a/crates/core/src/util/prometheus_handle.rs b/crates/core/src/util/prometheus_handle.rs index 41eae673a3..82bd3f3ec4 100644 --- a/crates/core/src/util/prometheus_handle.rs +++ b/crates/core/src/util/prometheus_handle.rs @@ -7,7 +7,7 @@ pub struct GaugeInc { impl Drop for GaugeInc { #[inline] fn drop(&mut self) { - self.gauge.dec() + self.gauge.dec(); } }