diff --git a/Cargo.lock b/Cargo.lock index e34a0dff789..a7b19502890 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4283,6 +4283,7 @@ dependencies = [ "once_cell", "openssl", "parking_lot 0.12.1", + "paste", "pin-project-lite", "prometheus", "prost", diff --git a/Cargo.toml b/Cargo.toml index 358e9868642..f059c8f1e5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,6 +118,7 @@ log = "0.4.17" nonempty = "0.8.1" once_cell = "1.16" parking_lot = { version = "0.12.1", features = ["send_guard", "arc_lock"] } +paste = "1.0" pin-project-lite = "0.2.9" postgres-types = "0.2.5" proc-macro2 = "1.0" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 92cb5ac0526..878dc69da19 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -51,6 +51,7 @@ nonempty.workspace = true once_cell.workspace = true openssl.workspace = true parking_lot.workspace = true +paste.workspace = true pin-project-lite.workspace = true prometheus.workspace = true prost.workspace = true diff --git a/crates/core/src/auth/identity.rs b/crates/core/src/auth/identity.rs index f7e56ca4935..935b18c0724 100644 --- a/crates/core/src/auth/identity.rs +++ b/crates/core/src/auth/identity.rs @@ -39,7 +39,7 @@ pub fn encode_token_with_expiry( }); let claims = SpacetimeIdentityClaims { - hex_identity: identity.to_hex(), + hex_identity: identity.to_hex().to_string(), iat: SystemTime::now(), exp: expiry, }; diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index e58484e0299..dd8b50633b4 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -2,7 +2,8 @@ use std::ops::Deref; use crate::host::{ModuleHost, NoSuchModule, ReducerArgs, ReducerCallError, ReducerCallResult}; use crate::protobuf::client_api::Subscribe; -use crate::worker_metrics::{CONNECTED_CLIENTS, WEBSOCKET_SENT, WEBSOCKET_SENT_MSG_SIZE}; +use crate::util::prometheus_handle::IntGaugeExt; +use crate::worker_metrics::WORKER_METRICS; use derive_more::From; use futures::prelude::*; use tokio::sync::mpsc; @@ -42,12 +43,11 @@ impl ClientConnectionSender { self.sendtx.send(message).await.map_err(|_| ClientClosed)?; - WEBSOCKET_SENT - .with_label_values(&[self.id.identity.to_hex().as_str()]) - .inc(); + WORKER_METRICS.websocket_sent.with_label_values(&self.id.identity).inc(); - WEBSOCKET_SENT_MSG_SIZE - .with_label_values(&[self.id.identity.to_hex().as_str()]) + WORKER_METRICS + .websocket_sent_msg_size + .with_label_values(&self.id.identity) .observe(bytes_len as f64); Ok(()) @@ -121,11 +121,8 @@ impl ClientConnection { }; let actor_fut = actor(this.clone(), sendrx); - tokio::spawn(async move { - CONNECTED_CLIENTS.inc(); - actor_fut.await; - CONNECTED_CLIENTS.dec(); - }); + let gauge_guard = WORKER_METRICS.connected_clients.inc_scope(); + tokio::spawn(actor_fut.map(|()| drop(gauge_guard))); Ok(this) } diff --git a/crates/core/src/client/message_handlers.rs b/crates/core/src/client/message_handlers.rs index edc394f7999..9c24328ff04 100644 --- a/crates/core/src/client/message_handlers.rs +++ b/crates/core/src/client/message_handlers.rs @@ -4,7 +4,7 @@ use crate::host::module_host::{EventStatus, ModuleEvent, ModuleFunctionCall}; use crate::host::{EnergyDiff, ReducerArgs, Timestamp}; use crate::identity::Identity; use crate::protobuf::client_api::{message, FunctionCall, Message, Subscribe}; -use crate::worker_metrics::{WEBSOCKET_REQUESTS, WEBSOCKET_REQUEST_MSG_SIZE}; +use crate::worker_metrics::WORKER_METRICS; use base64::Engine; use bytes::Bytes; use bytestring::ByteString; @@ -35,12 +35,14 @@ pub async fn handle(client: &ClientConnection, message: DataMessage) -> Result<( DataMessage::Binary(_) => "binary", }; - WEBSOCKET_REQUEST_MSG_SIZE - .with_label_values(&[format!("{}", client.database_instance_id).as_str(), message_kind]) + WORKER_METRICS + .websocket_request_msg_size + .with_label_values(&client.database_instance_id, message_kind) .observe(message.len() as f64); - WEBSOCKET_REQUESTS - .with_label_values(&[format!("{}", client.database_instance_id).as_str(), message_kind]) + WORKER_METRICS + .websocket_requests + .with_label_values(&client.database_instance_id, message_kind) .inc(); match message { diff --git a/crates/core/src/database_instance_context.rs b/crates/core/src/database_instance_context.rs index 4e296016c56..dc1f324df30 100644 --- a/crates/core/src/database_instance_context.rs +++ b/crates/core/src/database_instance_context.rs @@ -25,7 +25,7 @@ pub struct DatabaseInstanceContext { impl DatabaseInstanceContext { pub fn from_database(config: Config, database: &Database, instance_id: u64, root_db_path: PathBuf) -> Arc { let mut db_path = root_db_path; - db_path.extend([database.address.to_hex(), instance_id.to_string()]); + db_path.extend([&*database.address.to_hex(), &*instance_id.to_string()]); db_path.push("database"); let log_path = DatabaseLogger::filepath(&database.address, instance_id); @@ -44,7 +44,7 @@ impl DatabaseInstanceContext { pub fn scheduler_db_path(&self, root_db_path: PathBuf) -> PathBuf { let mut scheduler_db_path = root_db_path; - scheduler_db_path.extend([self.address.to_hex(), self.database_instance_id.to_string()]); + scheduler_db_path.extend([&*self.address.to_hex(), &*self.database_instance_id.to_string()]); scheduler_db_path.push("scheduler"); scheduler_db_path } diff --git a/crates/core/src/database_logger.rs b/crates/core/src/database_logger.rs index d42baff709c..53fe6ae4ae1 100644 --- a/crates/core/src/database_logger.rs +++ b/crates/core/src/database_logger.rs @@ -125,7 +125,7 @@ impl DatabaseLogger { pub fn filepath(address: &Address, instance_id: u64) -> PathBuf { let root = crate::stdb_path("worker_node/database_instances"); - root.join(address.to_hex()) + root.join(&*address.to_hex()) .join(instance_id.to_string()) .join("module_logs") } diff --git a/crates/core/src/db/db_metrics/mod.rs b/crates/core/src/db/db_metrics/mod.rs index 3b2f025e276..8cbc420f071 100644 --- a/crates/core/src/db/db_metrics/mod.rs +++ b/crates/core/src/db/db_metrics/mod.rs @@ -1,119 +1,55 @@ +use crate::util::typed_prometheus::metrics_group; use once_cell::sync::Lazy; -use prometheus::{Histogram, HistogramOpts, HistogramVec, Registry}; +use prometheus::{Histogram, HistogramVec}; -#[non_exhaustive] -pub struct DbMetrics { - pub registry: Registry, - pub tdb_insert_time: Histogram, - pub tdb_delete_time: Histogram, - pub tdb_seek_time: Histogram, - pub tdb_scan_time: Histogram, - pub tdb_commit_time: Histogram, - pub rdb_create_table_time: HistogramVec, - pub rdb_drop_table_time: HistogramVec, - pub rdb_iter_time: HistogramVec, - pub rdb_insert_row_time: HistogramVec, - pub rdb_delete_by_rel_time: HistogramVec, -} +metrics_group!( + #[non_exhaustive] + pub struct DbMetrics { + #[name = spacetime_tdb_insert_time] + #[help = "Time time it takes for the transactional store to perform an insert"] + pub tdb_insert_time: Histogram, -pub static DB_METRICS: Lazy = Lazy::new(DbMetrics::new); + #[name = spacetime_tdb_delete_time] + #[help = "Time time it takes for the transactional store to perform a delete"] + pub tdb_delete_time: Histogram, -impl DbMetrics { - fn new() -> Self { - DbMetrics { - registry: Registry::new(), - tdb_insert_time: Histogram::with_opts(HistogramOpts::new( - "spacetime_tdb_insert_time", - "Time time it takes for the transactional store to perform an insert", - )) - .unwrap(), - tdb_delete_time: Histogram::with_opts(HistogramOpts::new( - "spacetime_tdb_delete_time", - "Time time it takes for the transactional store to perform a delete", - )) - .unwrap(), - tdb_seek_time: Histogram::with_opts(HistogramOpts::new( - "spacetime_tdb_seek_time", - "Time time it takes for the transactional store to perform a seek", - )) - .unwrap(), - tdb_scan_time: Histogram::with_opts(HistogramOpts::new( - "spacetime_tdb_scan_time", - "Time time it takes for the transactional store to perform a scan", - )) - .unwrap(), - tdb_commit_time: Histogram::with_opts(HistogramOpts::new( - "spacetime_tdb_commit_time", - "Time time it takes for the transactional store to perform a Tx commit", - )) - .unwrap(), - rdb_create_table_time: HistogramVec::new( - HistogramOpts::new("spacetime_rdb_create_table_time", "The time it takes to create a table"), - &["table_name"], - ) - .unwrap(), - rdb_drop_table_time: HistogramVec::new( - HistogramOpts::new("spacetime_rdb_drop_table_time", "The time spent dropping a table"), - &["table_id"], - ) - .unwrap(), - rdb_iter_time: HistogramVec::new( - HistogramOpts::new("spacetime_rdb_iter_time", "The time spent iterating a table"), - &["table_id"], - ) - .unwrap(), - rdb_insert_row_time: HistogramVec::new( - HistogramOpts::new("spacetime_rdb_insert_row_time", "The time spent inserting into a table"), - &["table_id"], - ) - .unwrap(), - rdb_delete_by_rel_time: HistogramVec::new( - HistogramOpts::new( - "spacetime_rdb_delete_in_time", - "The time spent deleting values in a set from a table", - ), - &["table_id"], - ) - .unwrap(), - } - } + #[name = spacetime_tdb_seek_time] + #[help = "Time time it takes for the transactional store to perform a seek"] + pub tdb_seek_time: Histogram, - pub fn register_custom_metrics(&self) { - self.registry.register(Box::new(self.tdb_insert_time.clone())).unwrap(); - self.registry.register(Box::new(self.tdb_delete_time.clone())).unwrap(); - self.registry.register(Box::new(self.tdb_seek_time.clone())).unwrap(); - self.registry.register(Box::new(self.tdb_scan_time.clone())).unwrap(); - self.registry.register(Box::new(self.tdb_commit_time.clone())).unwrap(); + #[name = spacetime_tdb_scan_time] + #[help = "Time time it takes for the transactional store to perform a scan"] + pub tdb_scan_time: Histogram, - self.registry - .register(Box::new(self.rdb_create_table_time.clone())) - .unwrap(); - self.registry - .register(Box::new(self.rdb_drop_table_time.clone())) - .unwrap(); - self.registry.register(Box::new(self.rdb_iter_time.clone())).unwrap(); - self.registry - .register(Box::new(self.rdb_insert_row_time.clone())) - .unwrap(); - self.registry - .register(Box::new(self.rdb_delete_by_rel_time.clone())) - .unwrap(); - } -} + #[name = spacetime_tdb_commit_time] + #[help = "Time time it takes for the transactional store to perform a Tx commit"] + pub tdb_commit_time: Histogram, + + #[name = spacetime_rdb_create_table_time] + #[help = "The time it takes to create a table"] + #[labels(table_name: str)] + pub rdb_create_table_time: HistogramVec, + + #[name = spacetime_rdb_drop_table_time] + #[help = "The time spent dropping a table"] + #[labels(table_id: u32)] + pub rdb_drop_table_time: HistogramVec, -use DB_METRICS as METRICS; -metrics_delegator!(REGISTRY, registry: Registry); -metrics_delegator!(TDB_INSERT_TIME, tdb_insert_time: Histogram); -metrics_delegator!(TDB_DELETE_TIME, tdb_delete_time: Histogram); -metrics_delegator!(TDB_SEEK_TIME, tdb_seek_time: Histogram); -metrics_delegator!(TDB_SCAN_TIME, tdb_scan_time: Histogram); -metrics_delegator!(TDB_COMMIT_TIME, tdb_commit_time: Histogram); -metrics_delegator!(RDB_CREATE_TABLE_TIME, rdb_create_table_time: HistogramVec); -metrics_delegator!(RDB_DROP_TABLE_TIME, rdb_drop_table_time: HistogramVec); -metrics_delegator!(RDB_ITER_TIME, rdb_iter_time: HistogramVec); -metrics_delegator!(RDB_INSERT_TIME, rdb_insert_row_time: HistogramVec); -metrics_delegator!(RDB_DELETE_BY_REL_TIME, rdb_delete_by_rel_time: HistogramVec); + #[name = spacetime_rdb_iter_time] + #[help = "The time spent iterating a table"] + #[labels(table_id: u32)] + pub rdb_iter_time: HistogramVec, -pub fn register_custom_metrics() { - DB_METRICS.register_custom_metrics() -} + #[name = spacetime_rdb_insert_row_time] + #[help = "The time spent inserting into a table"] + #[labels(table_id: u32)] + pub rdb_insert_row_time: HistogramVec, + + #[name = spacetime_rdb_delete_in_time] + #[help = "The time spent deleting values in a set from a table"] + #[labels(table_id: u32)] + pub rdb_delete_by_rel_time: HistogramVec, + } +); + +pub static DB_METRICS: Lazy = Lazy::new(DbMetrics::new); diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 60e9ec1ee81..f7d10a9c72e 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -9,16 +9,14 @@ use super::ostorage::memory_object_db::MemoryObjectDB; use super::relational_operators::Relation; use crate::address::Address; use crate::db::commit_log; -use crate::db::db_metrics::{RDB_DELETE_BY_REL_TIME, RDB_DROP_TABLE_TIME, RDB_INSERT_TIME, RDB_ITER_TIME}; +use crate::db::db_metrics::DB_METRICS; use crate::db::messages::commit::Commit; use crate::db::ostorage::hashmap_object_db::HashMapObjectDB; use crate::db::ostorage::ObjectDB; use crate::error::{DBError, DatabaseError, IndexError, TableError}; use crate::hash::Hash; -use crate::util::prometheus_handle::HistogramVecHandle; use fs2::FileExt; use nonempty::NonEmpty; -use prometheus::HistogramVec; use spacetimedb_lib::ColumnIndexAttribute; use spacetimedb_lib::{data_key::ToDataKey, PrimaryKey}; use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue}; @@ -30,11 +28,6 @@ use std::sync::{Arc, Mutex}; use super::datastore::locking_tx_datastore::Locking; -/// Starts histogram prometheus measurements for `table_id`. -fn measure(hist: &'static HistogramVec, table_id: u32) { - HistogramVecHandle::new(hist, vec![format!("{}", table_id)]).start(); -} - pub const ST_TABLES_NAME: &str = "st_table"; pub const ST_COLUMNS_NAME: &str = "st_columns"; pub const ST_SEQUENCES_NAME: &str = "st_sequence"; @@ -394,7 +387,10 @@ impl RelationalDB { } pub fn drop_table(&self, tx: &mut MutTxId, table_id: u32) -> Result<(), DBError> { - measure(&RDB_DROP_TABLE_TIME, table_id); + let _guard = DB_METRICS + .rdb_drop_table_time + .with_label_values(&table_id) + .start_timer(); self.inner.drop_table_mut_tx(tx, TableId(table_id)) } @@ -484,7 +480,7 @@ impl RelationalDB { /// yielding every row in the table identified by `table_id`. #[tracing::instrument(skip(self, tx))] pub fn iter<'a>(&'a self, tx: &'a MutTxId, table_id: u32) -> Result, DBError> { - measure(&RDB_ITER_TIME, table_id); + let _guard = DB_METRICS.rdb_iter_time.with_label_values(&table_id).start_timer(); self.inner.iter_mut_tx(tx, TableId(table_id)) } @@ -521,7 +517,10 @@ impl RelationalDB { #[tracing::instrument(skip(self, tx, row))] pub fn insert(&self, tx: &mut MutTxId, table_id: u32, row: ProductValue) -> Result { - measure(&RDB_INSERT_TIME, table_id); + let _guard = DB_METRICS + .rdb_insert_row_time + .with_label_values(&table_id) + .start_timer(); self.inner.insert_mut_tx(tx, TableId(table_id), row) } @@ -552,7 +551,10 @@ impl RelationalDB { table_id: u32, relation: R, ) -> Result, DBError> { - measure(&RDB_DELETE_BY_REL_TIME, table_id); + let _guard = DB_METRICS + .rdb_delete_by_rel_time + .with_label_values(&table_id) + .start_timer(); self.inner.delete_by_rel_mut_tx(tx, TableId(table_id), relation) } diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index c60a3c9eb4f..053decb791e 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -12,6 +12,7 @@ use crate::protobuf::client_api::{table_row_operation, SubscriptionUpdate, Table use crate::subscription::module_subscription_actor::ModuleSubscriptionManager; use crate::util::lending_pool::{Closed, LendingPool, LentResource, PoolClosed}; use crate::util::notify_once::NotifyOnce; +use crate::worker_metrics::WORKER_METRICS; use base64::{engine::general_purpose::STANDARD as BASE_64_STD, Engine as _}; use futures::{Future, FutureExt}; use indexmap::IndexMap; @@ -461,7 +462,10 @@ pub enum InitDatabaseError { impl ModuleHost { pub fn new(threadpool: Arc, mut module: impl Module) -> Self { let info = module.info(); - let instance_pool = LendingPool::new(); + let waiter_gauge = WORKER_METRICS + .instance_queue_length + .with_label_values(&info.identity, &info.module_hash); + let instance_pool = LendingPool::new(waiter_gauge); instance_pool.add_multiple(module.initial_instances()).unwrap(); let inner = Arc::new(HostControllerActor { module: Arc::new(module), diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index bef2fed38af..2abd3cfb191 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -29,7 +29,7 @@ use crate::host::{ }; use crate::identity::Identity; use crate::subscription::module_subscription_actor::{ModuleSubscriptionManager, SubscriptionEventSender}; -use crate::worker_metrics::{REDUCER_COMPUTE_TIME, REDUCER_COUNT, REDUCER_WRITE_SIZE}; +use crate::worker_metrics::WORKER_METRICS; use super::instrumentation::CallTimes; use super::*; @@ -579,9 +579,12 @@ impl WasmModuleInstance { /// The method also performs various measurements and records energy usage. #[tracing::instrument(skip_all)] fn execute(&mut self, tx: Option, op: ReducerOp<'_>) -> (EventStatus, EnergyStats) { - let address = &self.database_instance_context().address.to_abbreviated_hex(); + let address = self.database_instance_context().address; let func_ident = &*self.info.reducers[op.id].name; - REDUCER_COUNT.with_label_values(&[address, func_ident]).inc(); + WORKER_METRICS + .reducer_count + .with_label_values(&address, func_ident) + .inc(); let energy_fingerprint = EnergyMonitorFingerprint { module_hash: self.info.module_hash, @@ -638,8 +641,9 @@ impl WasmModuleInstance { ); } - REDUCER_COMPUTE_TIME - .with_label_values(&[address, func_ident]) + WORKER_METRICS + .reducer_compute_time + .with_label_values(&address, func_ident) .observe(timings.total_duration.as_secs_f64()); // If you can afford to take 500 ms for a transaction @@ -681,8 +685,9 @@ impl WasmModuleInstance { // in batches. This is because it's possible for a tiny reducer call to trigger a whole commit to be written to disk. // We should track the commit sizes instead internally to the CommitLog probably. if let Some(bytes_written) = bytes_written { - REDUCER_WRITE_SIZE - .with_label_values(&[address, func_ident]) + WORKER_METRICS + .reducer_write_size + .with_label_values(&address, func_ident) .observe(bytes_written as f64); } EventStatus::Committed(DatabaseUpdate::from_writes(stdb, &tx_data)) diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index ac444945236..8973c872e6c 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -17,23 +17,6 @@ where STDB_PATH.join(s) } -// to let us be incremental in updating all the references to what used to be individual lazy_statics -macro_rules! metrics_delegator { - ($name:ident, $field:ident: $ty:ty) => { - #[allow(non_camel_case_types)] - pub struct $name { - __private: (), - } - pub static $name: $name = $name { __private: () }; - impl std::ops::Deref for $name { - type Target = $ty; - fn deref(&self) -> &$ty { - &METRICS.$field - } - } - }; -} - pub mod address { pub use spacetimedb_lib::Address; } diff --git a/crates/core/src/util/lending_pool.rs b/crates/core/src/util/lending_pool.rs index 2a6a60a3616..4356f4cf06f 100644 --- a/crates/core/src/util/lending_pool.rs +++ b/crates/core/src/util/lending_pool.rs @@ -9,8 +9,11 @@ use std::sync::Arc; use std::task::{Context, Poll}; use parking_lot::Mutex; +use prometheus::IntGauge; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use crate::util::prometheus_handle::IntGaugeExt; + use super::notify_once::{NotifiedOnce, NotifyOnce}; pub struct LendingPool { @@ -27,14 +30,9 @@ impl Clone for LendingPool { } } -impl Default for LendingPool { - fn default() -> Self { - Self::new() - } -} - struct LendingPoolInner { closed_notify: NotifyOnce, + waiter_gauge: IntGauge, vec: Mutex>, } @@ -47,15 +45,32 @@ struct PoolVec { pub struct PoolClosed; impl LendingPool { - pub fn new() -> Self { - Self::from_iter(std::iter::empty()) + pub fn new(waiter_gauge: IntGauge) -> Self { + Self::from_iter(std::iter::empty(), waiter_gauge) + } + + pub fn from_iter>(iter: I, waiter_gauge: IntGauge) -> Self { + let deque = VecDeque::from_iter(iter); + Self { + sem: Arc::new(Semaphore::new(deque.len())), + inner: Arc::new(LendingPoolInner { + closed_notify: NotifyOnce::new(), + waiter_gauge, + vec: Mutex::new(PoolVec { + total_count: deque.len(), + deque: Some(deque), + }), + }), + } } pub fn request(&self) -> impl Future, PoolClosed>> { let acq = self.sem.clone().acquire_owned(); let pool_inner = self.inner.clone(); + let waiter_guard = pool_inner.waiter_gauge.inc_scope(); async move { let permit = acq.await.map_err(|_| PoolClosed)?; + drop(waiter_guard); let resource = pool_inner .vec .lock() @@ -114,22 +129,6 @@ impl LendingPool { } } -impl FromIterator for LendingPool { - fn from_iter>(iter: I) -> Self { - let deque = VecDeque::from_iter(iter); - Self { - sem: Arc::new(Semaphore::new(deque.len())), - inner: Arc::new(LendingPoolInner { - closed_notify: NotifyOnce::new(), - vec: Mutex::new(PoolVec { - total_count: deque.len(), - deque: Some(deque), - }), - }), - } - } -} - pin_project_lite::pin_project! { pub struct Closed<'a> { #[pin] diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs index 76ef2c257f2..b8f142bec72 100644 --- a/crates/core/src/util/mod.rs +++ b/crates/core/src/util/mod.rs @@ -6,6 +6,7 @@ pub mod prometheus_handle; mod future_queue; pub mod lending_pool; pub mod notify_once; +pub mod typed_prometheus; pub use future_queue::{future_queue, FutureQueue}; diff --git a/crates/core/src/util/prometheus_handle.rs b/crates/core/src/util/prometheus_handle.rs index 75d66ea0992..41eae673a33 100644 --- a/crates/core/src/util/prometheus_handle.rs +++ b/crates/core/src/util/prometheus_handle.rs @@ -1,80 +1,29 @@ -use prometheus::{Histogram, HistogramVec}; -use std::time::{Duration, SystemTime}; +use prometheus::IntGauge; -/// An RAII-style handle for doing quick measurements of a single vertex Histogram value. -/// Time spent is measured at Drop, meaning the sample occurs regardless of how the owning scope -/// exits. -pub struct HistogramHandle { - hist: &'static Histogram, - pub start_instant: Option, +/// Decrements the inner [`IntGauge`] on drop. +pub struct GaugeInc { + gauge: IntGauge, } - -impl HistogramHandle { - pub fn new(hist: &'static Histogram) -> Self { - HistogramHandle { - hist, - start_instant: None, - } - } - - pub fn start(&mut self) { - self.start_instant = Some(SystemTime::now()); - } - - pub fn stop(&mut self) { - self.hist.observe(self.elapsed().as_secs_f64()); - } - - pub fn elapsed(&self) -> Duration { - let Some(start_instant) = self.start_instant else { - return Duration::ZERO; - }; - let duration = start_instant.elapsed(); - duration.unwrap_or(Duration::ZERO) - } -} -impl Drop for HistogramHandle { +impl Drop for GaugeInc { + #[inline] fn drop(&mut self) { - self.stop(); + self.gauge.dec() } } -/// An RAII-style handle for doing quick measurements of a multi-vertex labelled histogram value. -pub struct HistogramVecHandle { - hist: &'static HistogramVec, - label_values: Vec, - pub start_instant: Option, +/// Increment the given [`IntGauge`], and decrement it when the returned value goes out of scope. +#[inline] +pub fn inc_scope(gauge: &IntGauge) -> GaugeInc { + gauge.inc(); + GaugeInc { gauge: gauge.clone() } } -impl HistogramVecHandle { - pub fn new(hist: &'static HistogramVec, label_values: Vec) -> Self { - HistogramVecHandle { - hist, - label_values, - start_instant: None, - } - } - - pub fn start(&mut self) { - self.start_instant = Some(SystemTime::now()); - } - - pub fn stop(&mut self) { - let labels: Vec<&str> = self.label_values.as_slice().iter().map(|s| s.as_str()).collect(); - self.hist - .with_label_values(labels.as_slice()) - .observe(self.elapsed().as_secs_f64()); - } - pub fn elapsed(&self) -> Duration { - let Some(start_instant) = self.start_instant else { - return Duration::ZERO; - }; - let duration = start_instant.elapsed(); - duration.unwrap_or(Duration::ZERO) - } +pub trait IntGaugeExt { + fn inc_scope(&self) -> GaugeInc; } -impl Drop for HistogramVecHandle { - fn drop(&mut self) { - self.stop(); + +impl IntGaugeExt for IntGauge { + fn inc_scope(&self) -> GaugeInc { + inc_scope(self) } } diff --git a/crates/core/src/util/typed_prometheus.rs b/crates/core/src/util/typed_prometheus.rs new file mode 100644 index 00000000000..2f4e5116d4b --- /dev/null +++ b/crates/core/src/util/typed_prometheus.rs @@ -0,0 +1,126 @@ +use prometheus::core::{Metric, MetricVec, MetricVecBuilder}; +use spacetimedb_lib::{Address, Hash, Identity}; + +#[macro_export] +macro_rules! metrics_group { + ($(#[$attr:meta])* $type_vis:vis struct $type_name:ident { + $(#[name = $name:ident] #[help = $help:expr] $(#[labels($($labels:ident: $labelty:ty),*)])? $vis:vis $field:ident: $ty:ident,)* + }) => { + $(#[$attr])* + $type_vis struct $type_name { + $($vis $field: $crate::metrics_group!(@fieldtype $field $ty $(($($labels)*))?),)* + } + $($crate::metrics_group!(@maketype $vis $field $ty $(($($labels: $labelty),*))?);)* + impl $type_name { + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + Self { + $($field: $crate::make_collector!($crate::metrics_group!(@fieldtype $field $ty $(($($labels)*))?), stringify!($name), $help),)* + } + } + } + + impl prometheus::core::Collector for $type_name { + fn desc(&self) -> Vec<&prometheus::core::Desc> { + $crate::util::typed_prometheus::itertools::concat([ $(prometheus::core::Collector::desc(&self.$field)),* ]) + } + + fn collect(&self) -> Vec { + $crate::util::typed_prometheus::itertools::concat([ $(prometheus::core::Collector::collect(&self.$field)),* ]) + } + } + impl prometheus::core::Collector for &$type_name { + fn desc(&self) -> Vec<&prometheus::core::Desc> { + (**self).desc() + } + + fn collect(&self) -> Vec { + (**self).collect() + } + } + }; + (@fieldtype $field:ident $ty:ident ($($labels:tt)*)) => { $crate::util::typed_prometheus::paste! { [< $field:camel $ty >] } }; + (@fieldtype $field:ident $ty:ident) => { $ty }; + (@maketype $vis:vis $field:ident $ty:ident ($($labels:tt)*)) => { + $crate::util::typed_prometheus::paste! { + $crate::metrics_vec!($vis [< $field:camel $ty >]: $ty($($labels)*)); + } + }; + (@maketype $vis:vis $field:ident $ty:ident) => {}; +} +pub use metrics_group; +#[doc(hidden)] +pub use {itertools, paste::paste}; + +#[macro_export] +macro_rules! make_collector { + ($ty:ty, $name:expr, $help:expr $(,)?) => { + <$ty>::with_opts(prometheus::Opts::new($name, $help).into()).unwrap() + }; + ($ty:ty, $name:expr, $help:expr, $labels:expr $(,)?) => { + <$ty>::new(prometheus::Opts::new($name, $help).into(), $labels).unwrap() + }; +} +pub use make_collector; + +#[macro_export] +macro_rules! metrics_vec { + ($vis:vis $name:ident: $vecty:ident($($labels:ident: $labelty:ty),+ $(,)?)) => { + #[derive(Clone)] + $vis struct $name($vecty); + impl $name { + pub fn with_opts(opts: prometheus::Opts) -> prometheus::Result { + $vecty::new(opts.into(), &[$(stringify!($labels)),+]).map(Self) + } + + pub fn with_label_values(&self, $($labels: &$labelty),+) -> <$vecty as $crate::util::typed_prometheus::ExtractMetricVecT>::M { + use $crate::util::typed_prometheus::AsPrometheusLabel as _; + self.0.with_label_values(&[ $($labels.as_prometheus_str().as_ref()),+ ]) + } + } + + impl prometheus::core::Collector for $name { + fn desc(&self) -> Vec<&prometheus::core::Desc> { + prometheus::core::Collector::desc(&self.0) + } + + fn collect(&self) -> Vec { + prometheus::core::Collector::collect(&self.0) + } + } + }; +} +pub use metrics_vec; + +pub trait AsPrometheusLabel { + type Str<'a>: AsRef + 'a + where + Self: 'a; + fn as_prometheus_str(&self) -> Self::Str<'_>; +} +impl + ?Sized> AsPrometheusLabel for &T { + type Str<'a> = &'a str where Self: 'a; + fn as_prometheus_str(&self) -> Self::Str<'_> { + self.as_ref() + } +} +macro_rules! impl_prometheusvalue_string { + ($($x:ty),*) => { + $(impl AsPrometheusLabel for $x { + type Str<'a> = String; + fn as_prometheus_str(&self) -> Self::Str<'_> { + self.to_string() + } + })* + } +} +impl_prometheusvalue_string!(Hash, Identity, Address, u8, u16, u32, u64, i8, i16, i32, i64); + +#[doc(hidden)] +pub trait ExtractMetricVecT { + type M: Metric; +} + +impl ExtractMetricVecT for MetricVec { + type M = T::M; +} diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index fbf7125f4da..c5da709fa69 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -1,213 +1,87 @@ +use crate::util::typed_prometheus::metrics_group; use once_cell::sync::Lazy; -use prometheus::{Gauge, GaugeVec, HistogramOpts, HistogramVec, IntCounterVec, IntGauge, Opts, Registry}; - -pub struct WorkerMetrics { - registry: Registry, - connected_clients: IntGauge, - websocket_requests: IntCounterVec, - websocket_request_msg_size: HistogramVec, - websocket_sent: IntCounterVec, - websocket_sent_msg_size: HistogramVec, - process_cpu_usage: Gauge, - reducer_count: IntCounterVec, - reducer_compute_time: HistogramVec, - reducer_write_size: HistogramVec, - node_identity_energy_budget_gauge: GaugeVec, - instance_env_insert: HistogramVec, - // instance_env_delete_pk: HistogramVec, - // instance_env_delete_value: HistogramVec, - instance_env_delete_eq: HistogramVec, - // instance_env_delete_range: HistogramVec, -} - -static WORKER_METRICS: Lazy = Lazy::new(WorkerMetrics::new); - -impl WorkerMetrics { - fn new() -> Self { - Self { - registry: Registry::new(), - connected_clients: IntGauge::new( - "spacetime_worker_connected_clients", - "Number of clients connected to the worker.", - ) - .unwrap(), - websocket_requests: IntCounterVec::new( - Opts::new("spacetime_websocket_requests", "Number of websocket request messages"), - &["instance_id", "protocol"], - ) - .unwrap(), - websocket_request_msg_size: HistogramVec::new( - HistogramOpts::new( - "spacetime_websocket_request_msg_size", - "The size of messages received on connected sessions", - ), - &["instance_id", "protocol"], - ) - .unwrap(), - websocket_sent: IntCounterVec::new( - Opts::new( - "spacetime_websocket_sent", - "Number of websocket messages sent to client", - ), - &["identity"], - ) - .unwrap(), - websocket_sent_msg_size: HistogramVec::new( - HistogramOpts::new( - "spacetime_websocket_sent_msg_size", - "The size of messages sent to connected sessions", - ), - &["identity"], - ) - .unwrap(), - process_cpu_usage: Gauge::new("spacetime_worker_process_cpu_usage", "CPU usage of the worker process.") - .unwrap(), - reducer_count: IntCounterVec::new( - Opts::new("spacetime_worker_transactions", "Number of reducer calls."), - &["database_address", "reducer_symbol"], - ) - .unwrap(), - reducer_compute_time: HistogramVec::new( - HistogramOpts::new( - "spacetime_worker_module_tx_compute_time", - "The time it takes to compute and commit after reducer execution.", - ), - &["database_address", "reducer_symbol"], - ) - .unwrap(), - reducer_write_size: HistogramVec::new( - HistogramOpts::new( - "spacetime_worker_tx_size", - "The size of committed bytes in the message log after reducer execution.", - ), - &["database_address", "reducer_symbol"], - ) - .unwrap(), - node_identity_energy_budget_gauge: GaugeVec::new( - Opts::new( - "spacetime_worker_identity_energy_budget", - "Node-level energy budget, per identity", - ), - &["identity", "node"], - ) - .unwrap(), - instance_env_insert: HistogramVec::new( - HistogramOpts::new( - "spacetime_instance_env_insert", - "Time spent by reducers inserting rows (InstanceEnv::insert)", - ), - &["database_address", "table_id"], - ) - .unwrap(), - /* - instance_env_delete_pk: HistogramVec::new( - HistogramOpts::new( - "spacetime_instance_env_delete_pk", - "Time spent by reducers deleting rows by pk (InstanceEnv::delete_pk)", - ), - &["database_address", "table_id"], - ) - .unwrap(), - instance_env_delete_value: HistogramVec::new( - HistogramOpts::new( - "spacetime_instance_env_delete_value", - "Time spent by reducers deleting rows (InstanceEnv::delete_value)", - ), - &["database_address", "table_id"], - ) - .unwrap(), - */ - instance_env_delete_eq: HistogramVec::new( - HistogramOpts::new( - "spacetime_instance_env_delete_eq", - "Time spent by reducers deleting rows by eq (InstanceEnv::delete_eq)", - ), - &["database_address", "table_id"], - ) - .unwrap(), - /* - instance_env_delete_range: HistogramVec::new( - HistogramOpts::new( - "spacetime_instance_env_delete_range", - "Time spent by reducers deleting rows ranges eq (InstanceEnv::delete_range)", - ), - &["database_address", "table_id"], - ) - .unwrap(), - */ - } - } +use prometheus::{Gauge, GaugeVec, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec}; +use spacetimedb_lib::{Address, Hash, Identity}; + +metrics_group!( + pub struct WorkerMetrics { + #[name = spacetime_worker_connected_clients] + #[help = "Number of clients connected to the worker."] + pub connected_clients: IntGauge, + + #[name = spacetime_websocket_requests] + #[help = "Number of websocket request messages"] + #[labels(instance_id: u64, protocol: str)] + pub websocket_requests: IntCounterVec, + + #[name = spacetime_websocket_request_msg_size] + #[help = "The size of messages received on connected sessions"] + #[labels(instance_id: u64, protocol: str)] + pub websocket_request_msg_size: HistogramVec, + + #[name = spacetime_websocket_sent] + #[help = "Number of websocket messages sent to client"] + #[labels(identity: Identity)] + pub websocket_sent: IntCounterVec, + + #[name = spacetime_websocket_sent_msg_size] + #[help = "The size of messages sent to connected sessions"] + #[labels(identity: Identity)] + pub websocket_sent_msg_size: HistogramVec, + + #[name = spacetime_worker_process_cpu_usage] + #[help = "CPU usage of the worker process."] + pub process_cpu_usage: Gauge, + + #[name = spacetime_worker_transactions] + #[help = "Number of reducer calls."] + #[labels(database_address: Address, reducer_symbol: str)] + pub reducer_count: IntCounterVec, - pub fn register_custom_metrics(&self) { - self.registry - .register(Box::new(self.connected_clients.clone())) - .unwrap(); - self.registry - .register(Box::new(self.websocket_requests.clone())) - .unwrap(); - self.registry - .register(Box::new(self.websocket_request_msg_size.clone())) - .unwrap(); - self.registry.register(Box::new(self.websocket_sent.clone())).unwrap(); - self.registry - .register(Box::new(self.websocket_sent_msg_size.clone())) - .unwrap(); - self.registry - .register(Box::new(self.process_cpu_usage.clone())) - .unwrap(); - self.registry.register(Box::new(self.reducer_count.clone())).unwrap(); - self.registry - .register(Box::new(self.reducer_compute_time.clone())) - .unwrap(); - self.registry - .register(Box::new(self.reducer_write_size.clone())) - .unwrap(); - self.registry - .register(Box::new(self.instance_env_insert.clone())) - .unwrap(); - /* - self.registry - .register(Box::new(self.instance_env_delete_pk.clone())) - .unwrap(); - self.registry - .register(Box::new(self.instance_env_delete_value.clone())) - .unwrap(); - */ - self.registry - .register(Box::new(self.instance_env_delete_eq.clone())) - .unwrap(); - /* - self.registry - .register(Box::new(self.instance_env_delete_range.clone())) - .unwrap(); - */ - self.registry - .register(Box::new(self.node_identity_energy_budget_gauge.clone())) - .unwrap(); + #[name = spacetime_worker_module_tx_compute_time] + #[help = "The time it takes to compute and commit after reducer execution."] + #[labels(database_address: Address, reducer_symbol: str)] + pub reducer_compute_time: HistogramVec, + + #[name = spacetime_worker_tx_size] + #[help = "The size of committed bytes in the message log after reducer execution."] + #[labels(database_address: Address, reducer_symbol: str)] + pub reducer_write_size: HistogramVec, + + #[name = spacetime_worker_identity_energy_budget] + #[help = "Node-level energy budget, per identity"] + #[labels(identity: Identity, node: u64)] + pub node_identity_energy_budget_gauge: GaugeVec, + + #[name = spacetime_instance_env_insert] + #[help = "Time spent by reducers inserting rows (InstanceEnv::insert)"] + #[labels(database_address: Address, table_id: u32)] + pub instance_env_insert: HistogramVec, + + #[name = spacetime_instance_env_delete_eq] + #[help = "Time spent by reducers deleting rows by eq (InstanceEnv::delete_eq)"] + #[labels(database_address: Address, table_id: u32)] + pub instance_env_delete_eq: HistogramVec, + + #[name = spacetime_worker_instance_operation_queue_length] + #[help = "Length of the wait queue for access to a module instance."] + #[labels(identity: Identity, module_hash: Hash)] + pub instance_queue_length: IntGaugeVec, + // #[name = spacetime_instance_env_delete_pk] + // #[help = "Time spent by reducers deleting rows by pk (InstanceEnv::delete_pk)"] + // #[labels(database_address, table_id)] + // pub instance_env_delete_pk: HistogramVec, + + // #[name = spacetime_instance_env_delete_value] + // #[help = "Time spent by reducers deleting rows (InstanceEnv::delete_value)"] + // #[labels(database_address, table_id)] + // pub instance_env_delete_value: HistogramVec, + + // #[name = spacetime_instance_env_delete_range] + // #[help = "Time spent by reducers deleting rows ranges eq (InstanceEnv::delete_range)"] + // #[labels(database_address, table_id)] + // pub instance_env_delete_range: HistogramVec, } -} - -use WORKER_METRICS as METRICS; -metrics_delegator!(REGISTRY, registry: Registry); -metrics_delegator!(CONNECTED_CLIENTS, connected_clients: IntGauge); -metrics_delegator!(WEBSOCKET_REQUESTS, websocket_requests: IntCounterVec); -metrics_delegator!(WEBSOCKET_REQUEST_MSG_SIZE, websocket_request_msg_size: HistogramVec); -metrics_delegator!(WEBSOCKET_SENT, websocket_sent: IntCounterVec); -metrics_delegator!(WEBSOCKET_SENT_MSG_SIZE, websocket_sent_msg_size: HistogramVec); -metrics_delegator!(PROCESS_CPU_USAGE, process_cpu_usage: Gauge); -metrics_delegator!(REDUCER_COUNT, reducer_count: IntCounterVec); -metrics_delegator!(REDUCER_COMPUTE_TIME, reducer_compute_time: HistogramVec); -metrics_delegator!(REDUCER_WRITE_SIZE, reducer_write_size: HistogramVec); -metrics_delegator!( - NODE_IDENTITY_ENERGY_BUDGET_GAUGE, - node_identity_energy_budget_gauge: GaugeVec ); -metrics_delegator!(INSTANCE_ENV_INSERT, instance_env_insert: HistogramVec); -// metrics_delegator!(INSTANCE_ENV_DELETE_PK, instance_env_delete_pk: HistogramVec); -// metrics_delegator!(INSTANCE_ENV_DELETE_VALUE, instance_env_delete_value: HistogramVec); -metrics_delegator!(INSTANCE_ENV_DELETE_BY_COL_EQ, instance_env_delete_eq: HistogramVec); -//metrics_delegator!(INSTANCE_ENV_DELETE_RANGE, instance_env_delete_range: HistogramVec); - -pub fn register_custom_metrics() { - WORKER_METRICS.register_custom_metrics() -} + +pub static WORKER_METRICS: Lazy = Lazy::new(WorkerMetrics::new); diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index c7491bf72a7..de31e181e03 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -18,7 +18,7 @@ use spacetimedb::client::ClientActorIndex; use spacetimedb::control_db::{self, ControlDb}; use spacetimedb::database_instance_context::DatabaseInstanceContext; use spacetimedb::database_instance_context_controller::DatabaseInstanceContextController; -use spacetimedb::db::{db_metrics, Config}; +use spacetimedb::db::{db_metrics::DB_METRICS, Config}; use spacetimedb::host::EnergyQuanta; use spacetimedb::host::UpdateDatabaseResult; use spacetimedb::host::UpdateOutcome; @@ -28,7 +28,8 @@ use spacetimedb::messages::control_db::{Database, DatabaseInstance, HostType, Id use spacetimedb::module_host_context::ModuleHostContext; use spacetimedb::object_db::ObjectDb; use spacetimedb::sendgrid_controller::SendGridController; -use spacetimedb::{stdb_path, worker_metrics}; +use spacetimedb::stdb_path; +use spacetimedb::worker_metrics::WORKER_METRICS; use spacetimedb_lib::name::{DomainName, InsertDomainResult, RegisterTldResult, Tld}; use spacetimedb_lib::recovery::RecoveryCode; use std::fs::File; @@ -45,6 +46,7 @@ pub struct StandaloneEnv { public_key: DecodingKey, private_key: EncodingKey, public_key_bytes: Box<[u8]>, + metrics_registry: prometheus::Registry, /// The following config applies to the whole environment minus the control_db and object_db. config: Config, @@ -59,6 +61,11 @@ impl StandaloneEnv { let host_controller = Arc::new(HostController::new(energy_monitor.clone())); let client_actor_index = ClientActorIndex::new(); let (public_key, private_key, public_key_bytes) = get_or_create_keys()?; + + let metrics_registry = prometheus::Registry::new(); + metrics_registry.register(Box::new(&*WORKER_METRICS)).unwrap(); + metrics_registry.register(Box::new(&*DB_METRICS)).unwrap(); + let this = Arc::new(Self { control_db, db_inst_ctx_controller, @@ -68,6 +75,7 @@ impl StandaloneEnv { public_key, private_key, public_key_bytes, + metrics_registry, config, }); energy_monitor.set_standalone_env(this.clone()); @@ -154,9 +162,7 @@ fn get_key_path(env: &str) -> Option { #[async_trait] impl spacetimedb_client_api::NodeDelegate for StandaloneEnv { fn gather_metrics(&self) -> Vec { - let mut metric_families = worker_metrics::REGISTRY.gather(); - metric_families.extend(db_metrics::REGISTRY.gather()); - metric_families + self.metrics_registry.gather() } fn database_instance_context_controller(&self) -> &DatabaseInstanceContextController { diff --git a/crates/standalone/src/subcommands/start.rs b/crates/standalone/src/subcommands/start.rs index c4b46093877..53f04f58e52 100644 --- a/crates/standalone/src/subcommands/start.rs +++ b/crates/standalone/src/subcommands/start.rs @@ -4,8 +4,8 @@ use crate::StandaloneEnv; use clap::ArgAction::SetTrue; use clap::{Arg, ArgMatches}; use spacetimedb::config::{FilesGlobal, FilesLocal, SpacetimeDbFiles}; -use spacetimedb::db::{db_metrics, Config, FsyncPolicy, Storage}; -use spacetimedb::{startup, worker_metrics}; +use spacetimedb::db::{Config, FsyncPolicy, Storage}; +use spacetimedb::startup; use std::net::TcpListener; #[cfg(feature = "string")] @@ -224,12 +224,6 @@ pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> { startup::configure_tracing(); - // Metrics for pieces under worker_node/ related to reducer hosting, etc. - worker_metrics::register_custom_metrics(); - - // Metrics for our use of db/. - db_metrics::register_custom_metrics(); - let ctx = spacetimedb_client_api::ArcEnv(StandaloneEnv::init(config).await?); let service = router().with_state(ctx).into_make_service();