Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(1173): Record wait time for all reducers #1174

Merged
merged 1 commit into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::protobuf::client_api::{TableRowOperation, TableUpdate};
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
use crate::util::lending_pool::{Closed, LendingPool, LentResource, PoolClosed};
use crate::util::notify_once::NotifyOnce;
use crate::worker_metrics::WORKER_METRICS;
use bytes::Bytes;
use derive_more::{From, Into};
use futures::{Future, FutureExt};
Expand Down Expand Up @@ -636,12 +637,19 @@ impl ModuleHost {
&self.info.subscriptions
}

async fn call<F, R>(&self, _reducer_name: &str, f: F) -> Result<R, NoSuchModule>
async fn call<F, R>(&self, reducer: &str, f: F) -> Result<R, NoSuchModule>
where
F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static,
R: Send + 'static,
{
let mut inst = self.inner.get_instance(self.info.address).await?;
let mut inst = {
// Record the time spent waiting in the queue
let _guard = WORKER_METRICS
.reducer_wait_time
.with_label_values(&self.info.address, reducer)
.start_timer();
self.inner.get_instance(self.info.address).await?
Comment on lines +646 to +651
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coolreader18 do you agree that this is the correct place to be recording this metric? And that this works for all reducers?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for all reducers and for other such calls. but yeah, this seems right.

};

let result = tokio::task::spawn_blocking(move || f(&mut *inst))
.await
Expand Down
27 changes: 0 additions & 27 deletions crates/core/src/host/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ use tokio::sync::mpsc;
use tokio_util::time::delay_queue::Expired;
use tokio_util::time::{delay_queue, DelayQueue};

use crate::worker_metrics::{MAX_REDUCER_DELAY, WORKER_METRICS};

use super::module_host::WeakModuleHost;
use super::{ModuleHost, ReducerArgs, ReducerCallError, Timestamp};

Expand Down Expand Up @@ -260,7 +258,6 @@ impl SchedulerActor {
}

async fn handle_queued(&mut self, id: Expired<ScheduledReducerId>) {
let delay = id.deadline().elapsed().as_secs_f64();
let id = id.into_inner();
self.key_map.remove(&id);
let Some(module_host) = self.module_host.upgrade() else {
Expand All @@ -271,30 +268,6 @@ impl SchedulerActor {
};
let scheduled: ScheduledReducer = bsatn::from_slice(&scheduled).unwrap();

let db = module_host.info().address;
let reducer = scheduled.reducer.clone();
let mut guard = MAX_REDUCER_DELAY.lock().unwrap();
let max_reducer_delay = *guard
.entry((db, reducer))
.and_modify(|max| {
if delay > *max {
*max = delay;
}
})
.or_insert_with(|| delay);

// Note, we are only tracking the time a reducer spends delayed in the queue.
// This does not account for any time the executing thread spends blocked by the os.
WORKER_METRICS
.scheduled_reducer_delay_sec
.with_label_values(&db, &scheduled.reducer)
.observe(delay);
WORKER_METRICS
.scheduled_reducer_delay_sec_max
.with_label_values(&db, &scheduled.reducer)
.set(max_reducer_delay);
drop(guard);

let db = self.db.clone();
tokio::spawn(async move {
let info = module_host.info();
Expand Down
24 changes: 4 additions & 20 deletions crates/core/src/worker_metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use crate::execution_context::WorkloadType;
use crate::hash::Hash;
use once_cell::sync::Lazy;
use prometheus::{GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec};
use spacetimedb_data_structures::map::HashMap;
use prometheus::{HistogramVec, IntCounterVec, IntGaugeVec};
use spacetimedb_lib::{Address, Identity};
use spacetimedb_metrics::metrics_group;
use std::sync::Mutex;

metrics_group!(
pub struct WorkerMetrics {
Expand Down Expand Up @@ -45,18 +43,13 @@ metrics_group!(
#[buckets(0, 10, 25, 50, 75, 100, 150, 200, 250, 300, 350, 400, 450, 500, 1000)]
pub instance_queue_length_histogram: HistogramVec,

#[name = spacetime_scheduled_reducer_delay_sec]
#[help = "The amount of time (in seconds) a reducer has been delayed past its scheduled execution time"]
#[name = spacetime_reducer_wait_time_sec]
#[help = "The amount of time (in seconds) a reducer spends in the queue waiting to run"]
#[labels(db: Address, reducer: str)]
#[buckets(
1e-6, 5e-6, 1e-5, 5e-5, 1e-4, 5e-4, 1e-3, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0
)]
pub scheduled_reducer_delay_sec: HistogramVec,

#[name = spacetime_scheduled_reducer_delay_sec_max]
#[help = "The maximum duration (in seconds) a reducer has been delayed"]
#[labels(db: Address, reducer: str)]
pub scheduled_reducer_delay_sec_max: GaugeVec,
pub reducer_wait_time: HistogramVec,

#[name = spacetime_worker_wasm_instance_errors_cumulative]
#[help = "The number of fatal WASM instance errors, such as reducer panics."]
Expand All @@ -80,13 +73,4 @@ metrics_group!(
}
);

type ReducerLabel = (Address, String);

pub static MAX_REDUCER_DELAY: Lazy<Mutex<HashMap<ReducerLabel, f64>>> = Lazy::new(|| Mutex::new(HashMap::new()));
pub static WORKER_METRICS: Lazy<WorkerMetrics> = Lazy::new(WorkerMetrics::new);

pub fn reset_counters() {
// Reset max reducer wait time
WORKER_METRICS.scheduled_reducer_delay_sec_max.0.reset();
MAX_REDUCER_DELAY.lock().unwrap().clear();
}
3 changes: 1 addition & 2 deletions crates/standalone/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use spacetimedb::messages::control_db::{Database, DatabaseInstance, HostType, Id
use spacetimedb::module_host_context::ModuleHostContext;
use spacetimedb::object_db::ObjectDb;
use spacetimedb::sendgrid_controller::SendGridController;
use spacetimedb::stdb_path;
use spacetimedb::worker_metrics::WORKER_METRICS;
use spacetimedb::{stdb_path, worker_metrics};
use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, RegisterTldResult, Tld};
use spacetimedb_client_api_messages::recovery::RecoveryCode;
use std::fs::File;
Expand Down Expand Up @@ -160,7 +160,6 @@ impl spacetimedb_client_api::NodeDelegate for StandaloneEnv {
fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
defer_on_success! {
db_metrics::reset_counters();
worker_metrics::reset_counters();
}
// Note, we update certain metrics such as disk usage on demand.
self.db_inst_ctx_controller.update_metrics();
Expand Down
Loading