From 3df2a5376d5859febcf1d072eb8f3d28c2c1ce27 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Fri, 26 Apr 2024 16:10:27 -0700 Subject: [PATCH] fix(1173): Record wait time for all reducers Fixes #1173. Previously we were only recording this metric for scheduled reducers. We were also recording it before we acquired access to the module instance. Now we record it for all reducers after we acquire access to the module instance. This patch also removes max wait time since the histogram should suffice. --- crates/core/src/host/module_host.rs | 12 ++++++++++-- crates/core/src/host/scheduler.rs | 27 --------------------------- crates/core/src/worker_metrics/mod.rs | 24 ++++-------------------- crates/standalone/src/lib.rs | 3 +-- 4 files changed, 15 insertions(+), 51 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 05e05ef96f8..c3555ae7a52 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -14,6 +14,7 @@ use crate::protobuf::client_api::{TableRowOperation, TableUpdate}; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::util::lending_pool::{Closed, LendingPool, LentResource, PoolClosed}; use crate::util::notify_once::NotifyOnce; +use crate::worker_metrics::WORKER_METRICS; use bytes::Bytes; use derive_more::{From, Into}; use futures::{Future, FutureExt}; @@ -636,12 +637,19 @@ impl ModuleHost { &self.info.subscriptions } - async fn call(&self, _reducer_name: &str, f: F) -> Result + async fn call(&self, reducer: &str, f: F) -> Result where F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static, R: Send + 'static, { - let mut inst = self.inner.get_instance(self.info.address).await?; + let mut inst = { + // Record the time spent waiting in the queue + let _guard = WORKER_METRICS + .reducer_wait_time + .with_label_values(&self.info.address, reducer) + .start_timer(); + self.inner.get_instance(self.info.address).await? + }; let result = tokio::task::spawn_blocking(move || f(&mut *inst)) .await diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 385d20e8ebf..758a1be71dd 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -9,8 +9,6 @@ use tokio::sync::mpsc; use tokio_util::time::delay_queue::Expired; use tokio_util::time::{delay_queue, DelayQueue}; -use crate::worker_metrics::{MAX_REDUCER_DELAY, WORKER_METRICS}; - use super::module_host::WeakModuleHost; use super::{ModuleHost, ReducerArgs, ReducerCallError, Timestamp}; @@ -260,7 +258,6 @@ impl SchedulerActor { } async fn handle_queued(&mut self, id: Expired) { - let delay = id.deadline().elapsed().as_secs_f64(); let id = id.into_inner(); self.key_map.remove(&id); let Some(module_host) = self.module_host.upgrade() else { @@ -271,30 +268,6 @@ impl SchedulerActor { }; let scheduled: ScheduledReducer = bsatn::from_slice(&scheduled).unwrap(); - let db = module_host.info().address; - let reducer = scheduled.reducer.clone(); - let mut guard = MAX_REDUCER_DELAY.lock().unwrap(); - let max_reducer_delay = *guard - .entry((db, reducer)) - .and_modify(|max| { - if delay > *max { - *max = delay; - } - }) - .or_insert_with(|| delay); - - // Note, we are only tracking the time a reducer spends delayed in the queue. - // This does not account for any time the executing thread spends blocked by the os. - WORKER_METRICS - .scheduled_reducer_delay_sec - .with_label_values(&db, &scheduled.reducer) - .observe(delay); - WORKER_METRICS - .scheduled_reducer_delay_sec_max - .with_label_values(&db, &scheduled.reducer) - .set(max_reducer_delay); - drop(guard); - let db = self.db.clone(); tokio::spawn(async move { let info = module_host.info(); diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 6c429b1116a..110d350fef7 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -1,11 +1,9 @@ use crate::execution_context::WorkloadType; use crate::hash::Hash; use once_cell::sync::Lazy; -use prometheus::{GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec}; -use spacetimedb_data_structures::map::HashMap; +use prometheus::{HistogramVec, IntCounterVec, IntGaugeVec}; use spacetimedb_lib::{Address, Identity}; use spacetimedb_metrics::metrics_group; -use std::sync::Mutex; metrics_group!( pub struct WorkerMetrics { @@ -45,18 +43,13 @@ metrics_group!( #[buckets(0, 10, 25, 50, 75, 100, 150, 200, 250, 300, 350, 400, 450, 500, 1000)] pub instance_queue_length_histogram: HistogramVec, - #[name = spacetime_scheduled_reducer_delay_sec] - #[help = "The amount of time (in seconds) a reducer has been delayed past its scheduled execution time"] + #[name = spacetime_reducer_wait_time_sec] + #[help = "The amount of time (in seconds) a reducer spends in the queue waiting to run"] #[labels(db: Address, reducer: str)] #[buckets( 1e-6, 5e-6, 1e-5, 5e-5, 1e-4, 5e-4, 1e-3, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0 )] - pub scheduled_reducer_delay_sec: HistogramVec, - - #[name = spacetime_scheduled_reducer_delay_sec_max] - #[help = "The maximum duration (in seconds) a reducer has been delayed"] - #[labels(db: Address, reducer: str)] - pub scheduled_reducer_delay_sec_max: GaugeVec, + pub reducer_wait_time: HistogramVec, #[name = spacetime_worker_wasm_instance_errors_cumulative] #[help = "The number of fatal WASM instance errors, such as reducer panics."] @@ -80,13 +73,4 @@ metrics_group!( } ); -type ReducerLabel = (Address, String); - -pub static MAX_REDUCER_DELAY: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); pub static WORKER_METRICS: Lazy = Lazy::new(WorkerMetrics::new); - -pub fn reset_counters() { - // Reset max reducer wait time - WORKER_METRICS.scheduled_reducer_delay_sec_max.0.reset(); - MAX_REDUCER_DELAY.lock().unwrap().clear(); -} diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 1948092938f..4457b8ec0f9 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -30,8 +30,8 @@ use spacetimedb::messages::control_db::{Database, DatabaseInstance, HostType, Id use spacetimedb::module_host_context::ModuleHostContext; use spacetimedb::object_db::ObjectDb; use spacetimedb::sendgrid_controller::SendGridController; +use spacetimedb::stdb_path; use spacetimedb::worker_metrics::WORKER_METRICS; -use spacetimedb::{stdb_path, worker_metrics}; use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, RegisterTldResult, Tld}; use spacetimedb_client_api_messages::recovery::RecoveryCode; use std::fs::File; @@ -160,7 +160,6 @@ impl spacetimedb_client_api::NodeDelegate for StandaloneEnv { fn gather_metrics(&self) -> Vec { defer_on_success! { db_metrics::reset_counters(); - worker_metrics::reset_counters(); } // Note, we update certain metrics such as disk usage on demand. self.db_inst_ctx_controller.update_metrics();