Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus stuff #301

Merged
merged 4 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ log = "0.4.17"
nonempty = "0.8.1"
once_cell = "1.16"
parking_lot = { version = "0.12.1", features = ["send_guard", "arc_lock"] }
paste = "1.0"
pin-project-lite = "0.2.9"
postgres-types = "0.2.5"
proc-macro2 = "1.0"
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ nonempty.workspace = true
once_cell.workspace = true
openssl.workspace = true
parking_lot.workspace = true
paste.workspace = true
pin-project-lite.workspace = true
prometheus.workspace = true
prost.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/auth/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub fn encode_token_with_expiry(
});

let claims = SpacetimeIdentityClaims {
hex_identity: identity.to_hex(),
hex_identity: identity.to_hex().to_string(),
coolreader18 marked this conversation as resolved.
Show resolved Hide resolved
iat: SystemTime::now(),
exp: expiry,
};
Expand Down
19 changes: 8 additions & 11 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::ops::Deref;

use crate::host::{ModuleHost, NoSuchModule, ReducerArgs, ReducerCallError, ReducerCallResult};
use crate::protobuf::client_api::Subscribe;
use crate::worker_metrics::{CONNECTED_CLIENTS, WEBSOCKET_SENT, WEBSOCKET_SENT_MSG_SIZE};
use crate::util::prometheus_handle::IntGaugeExt;
use crate::worker_metrics::WORKER_METRICS;
use derive_more::From;
use futures::prelude::*;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -42,12 +43,11 @@ impl ClientConnectionSender {

self.sendtx.send(message).await.map_err(|_| ClientClosed)?;

WEBSOCKET_SENT
.with_label_values(&[self.id.identity.to_hex().as_str()])
.inc();
WORKER_METRICS.websocket_sent.with_label_values(&self.id.identity).inc();

WEBSOCKET_SENT_MSG_SIZE
.with_label_values(&[self.id.identity.to_hex().as_str()])
WORKER_METRICS
.websocket_sent_msg_size
.with_label_values(&self.id.identity)
.observe(bytes_len as f64);

Ok(())
Expand Down Expand Up @@ -121,11 +121,8 @@ impl ClientConnection {
};

let actor_fut = actor(this.clone(), sendrx);
tokio::spawn(async move {
CONNECTED_CLIENTS.inc();
actor_fut.await;
CONNECTED_CLIENTS.dec();
});
let gauge_guard = WORKER_METRICS.connected_clients.inc_scope();
tokio::spawn(actor_fut.map(|()| drop(gauge_guard)));

Ok(this)
}
Expand Down
12 changes: 7 additions & 5 deletions crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::host::module_host::{EventStatus, ModuleEvent, ModuleFunctionCall};
use crate::host::{EnergyDiff, ReducerArgs, Timestamp};
use crate::identity::Identity;
use crate::protobuf::client_api::{message, FunctionCall, Message, Subscribe};
use crate::worker_metrics::{WEBSOCKET_REQUESTS, WEBSOCKET_REQUEST_MSG_SIZE};
use crate::worker_metrics::WORKER_METRICS;
use base64::Engine;
use bytes::Bytes;
use bytestring::ByteString;
Expand Down Expand Up @@ -35,12 +35,14 @@ pub async fn handle(client: &ClientConnection, message: DataMessage) -> Result<(
DataMessage::Binary(_) => "binary",
};

WEBSOCKET_REQUEST_MSG_SIZE
.with_label_values(&[format!("{}", client.database_instance_id).as_str(), message_kind])
WORKER_METRICS
.websocket_request_msg_size
.with_label_values(&client.database_instance_id, message_kind)
.observe(message.len() as f64);

WEBSOCKET_REQUESTS
.with_label_values(&[format!("{}", client.database_instance_id).as_str(), message_kind])
WORKER_METRICS
.websocket_requests
.with_label_values(&client.database_instance_id, message_kind)
.inc();

match message {
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/database_instance_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct DatabaseInstanceContext {
impl DatabaseInstanceContext {
pub fn from_database(config: Config, database: &Database, instance_id: u64, root_db_path: PathBuf) -> Arc<Self> {
let mut db_path = root_db_path;
db_path.extend([database.address.to_hex(), instance_id.to_string()]);
db_path.extend([&*database.address.to_hex(), &*instance_id.to_string()]);
db_path.push("database");

let log_path = DatabaseLogger::filepath(&database.address, instance_id);
Expand All @@ -44,7 +44,7 @@ impl DatabaseInstanceContext {

pub fn scheduler_db_path(&self, root_db_path: PathBuf) -> PathBuf {
let mut scheduler_db_path = root_db_path;
scheduler_db_path.extend([self.address.to_hex(), self.database_instance_id.to_string()]);
scheduler_db_path.extend([&*self.address.to_hex(), &*self.database_instance_id.to_string()]);
scheduler_db_path.push("scheduler");
scheduler_db_path
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/database_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl DatabaseLogger {

pub fn filepath(address: &Address, instance_id: u64) -> PathBuf {
let root = crate::stdb_path("worker_node/database_instances");
root.join(address.to_hex())
root.join(&*address.to_hex())
coolreader18 marked this conversation as resolved.
Show resolved Hide resolved
.join(instance_id.to_string())
.join("module_logs")
}
Expand Down
158 changes: 47 additions & 111 deletions crates/core/src/db/db_metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,119 +1,55 @@
use crate::util::typed_prometheus::metrics_group;
use once_cell::sync::Lazy;
use prometheus::{Histogram, HistogramOpts, HistogramVec, Registry};
use prometheus::{Histogram, HistogramVec};

#[non_exhaustive]
pub struct DbMetrics {
pub registry: Registry,
pub tdb_insert_time: Histogram,
pub tdb_delete_time: Histogram,
pub tdb_seek_time: Histogram,
pub tdb_scan_time: Histogram,
pub tdb_commit_time: Histogram,
pub rdb_create_table_time: HistogramVec,
pub rdb_drop_table_time: HistogramVec,
pub rdb_iter_time: HistogramVec,
pub rdb_insert_row_time: HistogramVec,
pub rdb_delete_by_rel_time: HistogramVec,
}
metrics_group!(
#[non_exhaustive]
pub struct DbMetrics {
#[name = spacetime_tdb_insert_time]
#[help = "Time time it takes for the transactional store to perform an insert"]
pub tdb_insert_time: Histogram,

pub static DB_METRICS: Lazy<DbMetrics> = Lazy::new(DbMetrics::new);
#[name = spacetime_tdb_delete_time]
#[help = "Time time it takes for the transactional store to perform a delete"]
pub tdb_delete_time: Histogram,

impl DbMetrics {
fn new() -> Self {
DbMetrics {
registry: Registry::new(),
tdb_insert_time: Histogram::with_opts(HistogramOpts::new(
"spacetime_tdb_insert_time",
"Time time it takes for the transactional store to perform an insert",
))
.unwrap(),
tdb_delete_time: Histogram::with_opts(HistogramOpts::new(
"spacetime_tdb_delete_time",
"Time time it takes for the transactional store to perform a delete",
))
.unwrap(),
tdb_seek_time: Histogram::with_opts(HistogramOpts::new(
"spacetime_tdb_seek_time",
"Time time it takes for the transactional store to perform a seek",
))
.unwrap(),
tdb_scan_time: Histogram::with_opts(HistogramOpts::new(
"spacetime_tdb_scan_time",
"Time time it takes for the transactional store to perform a scan",
))
.unwrap(),
tdb_commit_time: Histogram::with_opts(HistogramOpts::new(
"spacetime_tdb_commit_time",
"Time time it takes for the transactional store to perform a Tx commit",
))
.unwrap(),
rdb_create_table_time: HistogramVec::new(
HistogramOpts::new("spacetime_rdb_create_table_time", "The time it takes to create a table"),
&["table_name"],
)
.unwrap(),
rdb_drop_table_time: HistogramVec::new(
HistogramOpts::new("spacetime_rdb_drop_table_time", "The time spent dropping a table"),
&["table_id"],
)
.unwrap(),
rdb_iter_time: HistogramVec::new(
HistogramOpts::new("spacetime_rdb_iter_time", "The time spent iterating a table"),
&["table_id"],
)
.unwrap(),
rdb_insert_row_time: HistogramVec::new(
HistogramOpts::new("spacetime_rdb_insert_row_time", "The time spent inserting into a table"),
&["table_id"],
)
.unwrap(),
rdb_delete_by_rel_time: HistogramVec::new(
HistogramOpts::new(
"spacetime_rdb_delete_in_time",
"The time spent deleting values in a set from a table",
),
&["table_id"],
)
.unwrap(),
}
}
#[name = spacetime_tdb_seek_time]
#[help = "Time time it takes for the transactional store to perform a seek"]
pub tdb_seek_time: Histogram,

pub fn register_custom_metrics(&self) {
self.registry.register(Box::new(self.tdb_insert_time.clone())).unwrap();
self.registry.register(Box::new(self.tdb_delete_time.clone())).unwrap();
self.registry.register(Box::new(self.tdb_seek_time.clone())).unwrap();
self.registry.register(Box::new(self.tdb_scan_time.clone())).unwrap();
self.registry.register(Box::new(self.tdb_commit_time.clone())).unwrap();
#[name = spacetime_tdb_scan_time]
#[help = "Time time it takes for the transactional store to perform a scan"]
pub tdb_scan_time: Histogram,

self.registry
.register(Box::new(self.rdb_create_table_time.clone()))
.unwrap();
self.registry
.register(Box::new(self.rdb_drop_table_time.clone()))
.unwrap();
self.registry.register(Box::new(self.rdb_iter_time.clone())).unwrap();
self.registry
.register(Box::new(self.rdb_insert_row_time.clone()))
.unwrap();
self.registry
.register(Box::new(self.rdb_delete_by_rel_time.clone()))
.unwrap();
}
}
#[name = spacetime_tdb_commit_time]
#[help = "Time time it takes for the transactional store to perform a Tx commit"]
pub tdb_commit_time: Histogram,

#[name = spacetime_rdb_create_table_time]
#[help = "The time it takes to create a table"]
#[labels(table_name: str)]
pub rdb_create_table_time: HistogramVec,

#[name = spacetime_rdb_drop_table_time]
#[help = "The time spent dropping a table"]
#[labels(table_id: u32)]
pub rdb_drop_table_time: HistogramVec,

use DB_METRICS as METRICS;
metrics_delegator!(REGISTRY, registry: Registry);
metrics_delegator!(TDB_INSERT_TIME, tdb_insert_time: Histogram);
metrics_delegator!(TDB_DELETE_TIME, tdb_delete_time: Histogram);
metrics_delegator!(TDB_SEEK_TIME, tdb_seek_time: Histogram);
metrics_delegator!(TDB_SCAN_TIME, tdb_scan_time: Histogram);
metrics_delegator!(TDB_COMMIT_TIME, tdb_commit_time: Histogram);
metrics_delegator!(RDB_CREATE_TABLE_TIME, rdb_create_table_time: HistogramVec);
metrics_delegator!(RDB_DROP_TABLE_TIME, rdb_drop_table_time: HistogramVec);
metrics_delegator!(RDB_ITER_TIME, rdb_iter_time: HistogramVec);
metrics_delegator!(RDB_INSERT_TIME, rdb_insert_row_time: HistogramVec);
metrics_delegator!(RDB_DELETE_BY_REL_TIME, rdb_delete_by_rel_time: HistogramVec);
#[name = spacetime_rdb_iter_time]
#[help = "The time spent iterating a table"]
#[labels(table_id: u32)]
pub rdb_iter_time: HistogramVec,

pub fn register_custom_metrics() {
DB_METRICS.register_custom_metrics()
}
#[name = spacetime_rdb_insert_row_time]
#[help = "The time spent inserting into a table"]
#[labels(table_id: u32)]
pub rdb_insert_row_time: HistogramVec,

#[name = spacetime_rdb_delete_in_time]
#[help = "The time spent deleting values in a set from a table"]
#[labels(table_id: u32)]
pub rdb_delete_by_rel_time: HistogramVec,
}
);

pub static DB_METRICS: Lazy<DbMetrics> = Lazy::new(DbMetrics::new);
26 changes: 14 additions & 12 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@ use super::ostorage::memory_object_db::MemoryObjectDB;
use super::relational_operators::Relation;
use crate::address::Address;
use crate::db::commit_log;
use crate::db::db_metrics::{RDB_DELETE_BY_REL_TIME, RDB_DROP_TABLE_TIME, RDB_INSERT_TIME, RDB_ITER_TIME};
use crate::db::db_metrics::DB_METRICS;
use crate::db::messages::commit::Commit;
use crate::db::ostorage::hashmap_object_db::HashMapObjectDB;
use crate::db::ostorage::ObjectDB;
use crate::error::{DBError, DatabaseError, IndexError, TableError};
use crate::hash::Hash;
use crate::util::prometheus_handle::HistogramVecHandle;
use fs2::FileExt;
use nonempty::NonEmpty;
use prometheus::HistogramVec;
use spacetimedb_lib::ColumnIndexAttribute;
use spacetimedb_lib::{data_key::ToDataKey, PrimaryKey};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};
Expand All @@ -30,11 +28,6 @@ use std::sync::{Arc, Mutex};

use super::datastore::locking_tx_datastore::Locking;

/// Starts histogram prometheus measurements for `table_id`.
fn measure(hist: &'static HistogramVec, table_id: u32) {
HistogramVecHandle::new(hist, vec![format!("{}", table_id)]).start();
}

pub const ST_TABLES_NAME: &str = "st_table";
pub const ST_COLUMNS_NAME: &str = "st_columns";
pub const ST_SEQUENCES_NAME: &str = "st_sequence";
Expand Down Expand Up @@ -394,7 +387,10 @@ impl RelationalDB {
}

pub fn drop_table(&self, tx: &mut MutTxId, table_id: u32) -> Result<(), DBError> {
measure(&RDB_DROP_TABLE_TIME, table_id);
let _guard = DB_METRICS
.rdb_drop_table_time
.with_label_values(&table_id)
.start_timer();
self.inner.drop_table_mut_tx(tx, TableId(table_id))
}

Expand Down Expand Up @@ -484,7 +480,7 @@ impl RelationalDB {
/// yielding every row in the table identified by `table_id`.
#[tracing::instrument(skip(self, tx))]
pub fn iter<'a>(&'a self, tx: &'a MutTxId, table_id: u32) -> Result<Iter<'a>, DBError> {
measure(&RDB_ITER_TIME, table_id);
let _guard = DB_METRICS.rdb_iter_time.with_label_values(&table_id).start_timer();
self.inner.iter_mut_tx(tx, TableId(table_id))
}

Expand Down Expand Up @@ -521,7 +517,10 @@ impl RelationalDB {

#[tracing::instrument(skip(self, tx, row))]
pub fn insert(&self, tx: &mut MutTxId, table_id: u32, row: ProductValue) -> Result<ProductValue, DBError> {
measure(&RDB_INSERT_TIME, table_id);
let _guard = DB_METRICS
.rdb_insert_row_time
.with_label_values(&table_id)
.start_timer();
self.inner.insert_mut_tx(tx, TableId(table_id), row)
}

Expand Down Expand Up @@ -552,7 +551,10 @@ impl RelationalDB {
table_id: u32,
relation: R,
) -> Result<Option<u32>, DBError> {
measure(&RDB_DELETE_BY_REL_TIME, table_id);
let _guard = DB_METRICS
.rdb_delete_by_rel_time
.with_label_values(&table_id)
.start_timer();
self.inner.delete_by_rel_mut_tx(tx, TableId(table_id), relation)
}

Expand Down
6 changes: 5 additions & 1 deletion crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::protobuf::client_api::{table_row_operation, SubscriptionUpdate, Table
use crate::subscription::module_subscription_actor::ModuleSubscriptionManager;
use crate::util::lending_pool::{Closed, LendingPool, LentResource, PoolClosed};
use crate::util::notify_once::NotifyOnce;
use crate::worker_metrics::WORKER_METRICS;
use base64::{engine::general_purpose::STANDARD as BASE_64_STD, Engine as _};
use futures::{Future, FutureExt};
use indexmap::IndexMap;
Expand Down Expand Up @@ -461,7 +462,10 @@ pub enum InitDatabaseError {
impl ModuleHost {
pub fn new(threadpool: Arc<HostThreadpool>, mut module: impl Module) -> Self {
let info = module.info();
let instance_pool = LendingPool::new();
let waiter_gauge = WORKER_METRICS
.instance_queue_length
.with_label_values(&info.identity, &info.module_hash);
let instance_pool = LendingPool::new(waiter_gauge);
instance_pool.add_multiple(module.initial_instances()).unwrap();
let inner = Arc::new(HostControllerActor {
module: Arc::new(module),
Expand Down
Loading