Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Track maximum values for transaction cpu time and reducer queue length #583

Merged
merged 4 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 38 additions & 5 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use super::{
},
traits::{self, DataRow, MutTx, MutTxDatastore, TxData, TxDatastore},
};
use crate::db::datastore::system_tables;
use crate::db::datastore::system_tables::{
st_constraints_schema, st_module_schema, table_name_is_system, StColumnFields, StConstraintRow, StIndexFields,
StModuleRow, StSequenceFields, StTableFields, SystemTables, CONSTRAINT_ID_SEQUENCE_ID, INDEX_ID_SEQUENCE_ID,
SEQUENCE_ID_SEQUENCE_ID, ST_CONSTRAINTS_ID, ST_CONSTRAINT_ROW_TYPE, ST_MODULE_ID, ST_MODULE_ROW_TYPE,
TABLE_ID_SEQUENCE_ID, WASM_MODULE,
};
use crate::db::db_metrics::DB_METRICS;
use crate::db::db_metrics::{DB_METRICS, MAX_TX_CPU_TIME};
use crate::{db::datastore::system_tables, execution_context::TransactionType};
use crate::{
db::datastore::traits::{TxOp, TxRecord},
db::{
Expand All @@ -45,14 +45,18 @@ use spacetimedb_sats::{
db::auth::{StAccess, StTableType},
AlgebraicType, AlgebraicValue, ProductType, ProductValue,
};
use std::time::{Duration, Instant};
use std::{
borrow::Cow,
collections::{BTreeMap, BTreeSet, HashMap},
hash::Hasher,
ops::{Deref, RangeBounds},
sync::Arc,
vec,
};
use std::{
collections::hash_map::DefaultHasher,
time::{Duration, Instant},
};
use thiserror::Error;

#[derive(Error, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -2115,6 +2119,9 @@ impl traits::MutTx for Locking {
let reducer = ctx.reducer_name().unwrap_or_default();
let elapsed_time = tx.timer.elapsed();
let cpu_time = elapsed_time - tx.lock_wait_time;

let elapsed_time = elapsed_time.as_secs_f64();
let cpu_time = cpu_time.as_secs_f64();
// Note, we record empty transactions in our metrics.
// That is, transactions that don't write any rows to the commit log.
DB_METRICS
Expand All @@ -2124,11 +2131,37 @@ impl traits::MutTx for Locking {
DB_METRICS
.rdb_txn_cpu_time_sec
.with_label_values(txn_type, db, reducer)
.observe(cpu_time.as_secs_f64());
.observe(cpu_time);
DB_METRICS
.rdb_txn_elapsed_time_sec
.with_label_values(txn_type, db, reducer)
.observe(elapsed_time.as_secs_f64());
.observe(elapsed_time);

fn hash(a: &TransactionType, b: &Address, c: &str) -> u64 {
use std::hash::Hash;
let mut hasher = DefaultHasher::new();
a.hash(&mut hasher);
b.hash(&mut hasher);
c.hash(&mut hasher);
hasher.finish()
}

let mut guard = MAX_TX_CPU_TIME.lock().unwrap();
let max_cpu_time = *guard
.entry(hash(txn_type, db, reducer))
.and_modify(|max| {
if cpu_time > *max {
*max = cpu_time;
}
})
.or_insert_with(|| cpu_time);

drop(guard);
DB_METRICS
.rdb_txn_cpu_time_sec_max
.with_label_values(txn_type, db, reducer)
.set(max_cpu_time);

tx.lock.commit()
}

Expand Down
15 changes: 9 additions & 6 deletions crates/core/src/db/db_metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::{collections::HashMap, sync::Mutex};

use crate::{execution_context::TransactionType, host::AbiCall, util::typed_prometheus::metrics_group};
use once_cell::sync::Lazy;
use prometheus::{Histogram, HistogramVec, IntCounterVec, IntGaugeVec};
use prometheus::{GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGaugeVec};
use spacetimedb_lib::Address;

metrics_group!(
Expand Down Expand Up @@ -51,11 +53,6 @@ metrics_group!(
#[labels(table_id: u32)]
pub rdb_delete_by_rel_time: HistogramVec,

#[name = spacetime_scheduled_reducer_delay_sec]
#[help = "The amount of time (in seconds) a reducer has been delayed past its scheduled execution time"]
#[labels(db: Address, reducer: str)]
pub scheduled_reducer_delay_sec: HistogramVec,

#[name = spacetime_num_table_rows]
#[help = "The number of rows in a table"]
#[labels(db: Address, table_id: u32)]
Expand Down Expand Up @@ -101,6 +98,11 @@ metrics_group!(
#[labels(txn_type: TransactionType, db: Address, reducer: str)]
pub rdb_txn_cpu_time_sec: HistogramVec,

#[name = spacetime_txn_cpu_time_sec_max]
#[help = "The cpu time of the longest running transaction (in seconds)"]
#[labels(txn_type: TransactionType, db: Address, reducer: str)]
pub rdb_txn_cpu_time_sec_max: GaugeVec,

#[name = spacetime_wasm_abi_call_duration_sec]
#[help = "The total duration of a spacetime wasm abi call (in seconds); includes row serialization and copying into wasm memory"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, call: AbiCall)]
Expand All @@ -123,4 +125,5 @@ metrics_group!(
}
);

pub static MAX_TX_CPU_TIME: Lazy<Mutex<HashMap<u64, f64>>> = Lazy::new(|| Mutex::new(HashMap::new()));
pub static DB_METRICS: Lazy<DbMetrics> = Lazy::new(DbMetrics::new);
2 changes: 1 addition & 1 deletion crates/core/src/execution_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct ExecutionContext<'a> {
/// A transaction can be executing a reducer.
/// It can be used to satisfy a one-off sql query or subscription.
/// It can also be an internal operation that is not associated with a reducer or sql request.
#[derive(Clone, Copy, Display)]
#[derive(Clone, Copy, Display, Hash, PartialEq, Eq)]
pub enum TransactionType {
Reducer,
Sql,
Expand Down
24 changes: 6 additions & 18 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,7 @@ impl fmt::Debug for ModuleHost {

#[async_trait::async_trait]
trait DynModuleHost: Send + Sync + 'static {
async fn get_instance(
&self,
ctx: (Identity, Hash, Address, &str),
) -> Result<(&HostThreadpool, Box<dyn ModuleInstance>), NoSuchModule>;
async fn get_instance(&self, db: Address) -> Result<(&HostThreadpool, Box<dyn ModuleInstance>), NoSuchModule>;
fn inject_logs(&self, log_level: LogLevel, message: &str);
fn one_off_query(
&self,
Expand Down Expand Up @@ -381,22 +378,19 @@ async fn select_first<A: Future, B: Future<Output = ()>>(fut_a: A, fut_b: B) ->

#[async_trait::async_trait]
impl<T: Module> DynModuleHost for HostControllerActor<T> {
async fn get_instance(
&self,
ctx: (Identity, Hash, Address, &str),
) -> Result<(&HostThreadpool, Box<dyn ModuleInstance>), NoSuchModule> {
async fn get_instance(&self, db: Address) -> Result<(&HostThreadpool, Box<dyn ModuleInstance>), NoSuchModule> {
self.start.notified().await;
// in the future we should do something like in the else branch here -- add more instances based on load.
// we need to do write-skew retries first - right now there's only ever once instance per module.
let inst = if true {
self.instance_pool
.request_with_context(ctx)
.request_with_context(db)
.await
.map_err(|_| NoSuchModule)?
} else {
const GET_INSTANCE_TIMEOUT: Duration = Duration::from_millis(500);
select_first(
self.instance_pool.request_with_context(ctx),
self.instance_pool.request_with_context(db),
tokio::time::sleep(GET_INSTANCE_TIMEOUT).map(|()| self.spinup_new_instance()),
)
.await
Expand Down Expand Up @@ -509,18 +503,12 @@ impl ModuleHost {
&self.info.subscription
}

async fn call<F, R>(&self, reducer_name: &str, f: F) -> Result<R, NoSuchModule>
async fn call<F, R>(&self, _reducer_name: &str, f: F) -> Result<R, NoSuchModule>
where
F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static,
R: Send + 'static,
{
let context = (
self.info.identity,
self.info.module_hash,
self.info.address,
reducer_name,
);
let (threadpool, mut inst) = self.inner.get_instance(context).await?;
let (threadpool, mut inst) = self.inner.get_instance(self.info.address).await?;

let (tx, rx) = oneshot::channel();
threadpool.spawn(move || {
Expand Down
40 changes: 34 additions & 6 deletions crates/core/src/host/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;
use std::path::Path;

use futures::StreamExt;
use rustc_hash::FxHashMap;
use sled::transaction::{ConflictableTransactionError::Abort as TxAbort, TransactionError};
use spacetimedb_lib::bsatn;
use spacetimedb_lib::bsatn::ser::BsatnError;
use spacetimedb_lib::{bsatn, Address};
use tokio::sync::mpsc;
use tokio_util::time::delay_queue::Expired;
use tokio_util::time::{delay_queue, DelayQueue};

use crate::db::db_metrics::DB_METRICS;
use crate::worker_metrics::{MAX_REDUCER_DELAY, WORKER_METRICS};

use super::module_host::WeakModuleHost;
use super::{ModuleHost, ReducerArgs, ReducerCallError, Timestamp};
Expand Down Expand Up @@ -260,7 +262,7 @@ impl SchedulerActor {
}

async fn handle_queued(&mut self, id: Expired<ScheduledReducerId>) {
let delay = id.deadline().elapsed();
let delay = id.deadline().elapsed().as_secs_f64();
let id = id.into_inner();
self.key_map.remove(&id);
let Some(module_host) = self.module_host.upgrade() else {
Expand All @@ -270,12 +272,38 @@ impl SchedulerActor {
return;
};
let scheduled: ScheduledReducer = bsatn::from_slice(&scheduled).unwrap();

fn hash(a: &Address, b: &str) -> u64 {
use std::hash::Hash;
let mut hasher = DefaultHasher::new();
a.hash(&mut hasher);
b.hash(&mut hasher);
hasher.finish()
}

let db = module_host.info().address;
let mut guard = MAX_REDUCER_DELAY.lock().unwrap();
let max_reducer_delay = *guard
.entry(hash(&db, &scheduled.reducer))
.and_modify(|max| {
if delay > *max {
*max = delay;
}
})
.or_insert_with(|| delay);

// Note, we are only tracking the time a reducer spends delayed in the queue.
// This does not account for any time the executing thread spends blocked by the os.
DB_METRICS
WORKER_METRICS
.scheduled_reducer_delay_sec
.with_label_values(&module_host.info().address, &scheduled.reducer)
.observe(delay.as_secs_f64());
.with_label_values(&db, &scheduled.reducer)
.observe(delay);
WORKER_METRICS
.scheduled_reducer_delay_sec_max
.with_label_values(&db, &scheduled.reducer)
.set(max_reducer_delay);
drop(guard);

let db = self.db.clone();
tokio::spawn(async move {
let info = module_host.info();
Expand Down
40 changes: 21 additions & 19 deletions crates/core/src/util/lending_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use parking_lot::Mutex;
use spacetimedb_lib::{Address, Hash, Identity};
use spacetimedb_lib::Address;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};

use crate::worker_metrics::WORKER_METRICS;
use crate::worker_metrics::{MAX_QUEUE_LEN, WORKER_METRICS};

use super::notify_once::{NotifiedOnce, NotifyOnce};

Expand Down Expand Up @@ -54,28 +54,30 @@ impl<T> LendingPool<T> {
Self::from_iter(std::iter::empty())
}

pub fn request_with_context(
&self,
(identity, module_hash, database_address, reducer_symbol): (Identity, Hash, Address, &str),
) -> impl Future<Output = Result<LentResource<T>, PoolClosed>> {
pub fn request_with_context(&self, db: Address) -> impl Future<Output = Result<LentResource<T>, PoolClosed>> {
let acq = self.sem.clone().acquire_owned();
let pool_inner = self.inner.clone();

let queue_len = WORKER_METRICS.instance_queue_length.with_label_values(
&identity,
&module_hash,
&database_address,
reducer_symbol,
);
let queue_len_histogram = WORKER_METRICS.instance_queue_length_histogram.with_label_values(
&identity,
&module_hash,
&database_address,
reducer_symbol,
);
let queue_len = WORKER_METRICS.instance_queue_length.with_label_values(&db);
let queue_len_max = WORKER_METRICS.instance_queue_length_max.with_label_values(&db);
let queue_len_histogram = WORKER_METRICS.instance_queue_length_histogram.with_label_values(&db);
Comment on lines +61 to +63
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I'm no longer partitioning these metrics by reducer since it doesn't provide any additional observability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.


queue_len.inc();
queue_len_histogram.observe(queue_len.get() as f64);
let new_queue_len = queue_len.get();
queue_len_histogram.observe(new_queue_len as f64);

let mut guard = MAX_QUEUE_LEN.lock().unwrap();
let max_queue_len = *guard
.entry(db)
.and_modify(|max| {
if new_queue_len > *max {
*max = new_queue_len;
}
})
.or_insert_with(|| new_queue_len);

drop(guard);
queue_len_max.set(max_queue_len);

async move {
let permit = acq.await.map_err(|_| PoolClosed)?;
Expand Down
23 changes: 21 additions & 2 deletions crates/core/src/worker_metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{collections::HashMap, sync::Mutex};

use crate::hash::Hash;
use crate::util::typed_prometheus::metrics_group;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -67,15 +69,30 @@ metrics_group!(

#[name = spacetime_worker_instance_operation_queue_length]
#[help = "Length of the wait queue for access to a module instance."]
#[labels(identity: Identity, module_hash: Hash, database_address: Address, reducer_symbol: str)]
#[labels(database_address: Address)]
pub instance_queue_length: IntGaugeVec,

#[name = spacetime_worker_instance_operation_queue_length_max]
#[help = "Max length of the wait queue for access to a module instance."]
#[labels(database_address: Address)]
pub instance_queue_length_max: IntGaugeVec,

#[name = spacetime_worker_instance_operation_queue_length_histogram]
#[help = "Length of the wait queue for access to a module instance."]
#[labels(identity: Identity, module_hash: Hash, database_address: Address, reducer_symbol: str)]
#[labels(database_address: Address)]
#[buckets(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 25, 50)]
pub instance_queue_length_histogram: HistogramVec,

#[name = spacetime_scheduled_reducer_delay_sec]
#[help = "The amount of time (in seconds) a reducer has been delayed past its scheduled execution time"]
#[labels(db: Address, reducer: str)]
pub scheduled_reducer_delay_sec: HistogramVec,

#[name = spacetime_scheduled_reducer_delay_sec_max]
#[help = "The maximum duration (in seconds) a reducer has been delayed"]
#[labels(db: Address, reducer: str)]
pub scheduled_reducer_delay_sec_max: GaugeVec,

#[name = spacetime_worker_wasm_instance_errors_cumulative]
#[help = "The number of fatal WASM instance errors, such as reducer panics."]
#[labels(identity: Identity, module_hash: Hash, database_address: Address, reducer_symbol: str)]
Expand All @@ -99,4 +116,6 @@ metrics_group!(
}
);

pub static MAX_QUEUE_LEN: Lazy<Mutex<HashMap<Address, i64>>> = Lazy::new(|| Mutex::new(HashMap::new()));
pub static MAX_REDUCER_DELAY: Lazy<Mutex<HashMap<u64, f64>>> = Lazy::new(|| Mutex::new(HashMap::new()));
pub static WORKER_METRICS: Lazy<WorkerMetrics> = Lazy::new(WorkerMetrics::new);
1 change: 1 addition & 0 deletions crates/standalone/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ http.workspace = true
log.workspace = true
openssl.workspace = true
prometheus.workspace = true
scopeguard.workspace = true
sled.workspace = true
tokio.workspace = true
tower-http.workspace = true
Expand Down
Loading