diff --git a/Cargo.lock b/Cargo.lock index 4d0086b8640c..c2986e3dbb4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3959,12 +3959,15 @@ dependencies = [ name = "query-engine-metrics" version = "0.1.0" dependencies = [ + "derive_more", "expect-test", + "futures", "metrics", "metrics-exporter-prometheus", "metrics-util", "once_cell", "parking_lot", + "pin-project", "serde", "serde_json", "tokio", diff --git a/Cargo.toml b/Cargo.toml index cac859cf4b83..63a1d4c6d377 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ members = [ [workspace.dependencies] async-trait = { version = "0.1.77" } enumflags2 = { version = "0.7", features = ["serde"] } +futures = "0.3" psl = { path = "./psl/psl" } serde_json = { version = "1", features = ["float_roundtrip", "preserve_order", "raw_value"] } serde = { version = "1", features = ["derive"] } @@ -66,6 +67,7 @@ napi = { version = "2.15.1", default-features = false, features = [ napi-derive = "2.15.0" metrics = "0.23.0" js-sys = { version = "0.3" } +pin-project = "1" rand = { version = "0.8" } regex = { version = "1", features = ["std"] } serde_repr = { version = "0.1.17" } diff --git a/libs/crosstarget-utils/Cargo.toml b/libs/crosstarget-utils/Cargo.toml index 8dd311e292db..78d52dade2b0 100644 --- a/libs/crosstarget-utils/Cargo.toml +++ b/libs/crosstarget-utils/Cargo.toml @@ -8,14 +8,14 @@ edition = "2021" [dependencies] derive_more.workspace = true enumflags2.workspace = true -futures = "0.3" +futures.workspace = true [target.'cfg(target_arch = "wasm32")'.dependencies] js-sys.workspace = true wasm-bindgen.workspace = true wasm-bindgen-futures.workspace = true tokio = { version = "1", features = ["macros", "sync"] } -pin-project = "1" +pin-project.workspace = true [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio.workspace = true diff --git a/quaint/Cargo.toml b/quaint/Cargo.toml index cfac06e101df..0cb1b706a8f9 100644 --- a/quaint/Cargo.toml +++ b/quaint/Cargo.toml @@ -77,7 +77,7 @@ async-trait.workspace = true thiserror = "1.0" num_cpus = "1.12" metrics.workspace = true -futures = "0.3" +futures.workspace = true url.workspace = true hex = "0.4" itertools.workspace = true diff --git a/query-engine/connector-test-kit-rs/query-engine-tests/Cargo.toml b/query-engine/connector-test-kit-rs/query-engine-tests/Cargo.toml index 46d1d4b845fd..076f993f1535 100644 --- a/query-engine/connector-test-kit-rs/query-engine-tests/Cargo.toml +++ b/query-engine/connector-test-kit-rs/query-engine-tests/Cargo.toml @@ -22,7 +22,7 @@ user-facing-errors.workspace = true prisma-value = { path = "../../../libs/prisma-value" } query-engine-metrics = { path = "../../metrics"} once_cell = "1.15.0" -futures = "0.3" +futures.workspace = true paste = "1.0.14" [dev-dependencies] diff --git a/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/regressions/prisma_15607.rs b/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/regressions/prisma_15607.rs index e026a90016bd..7a50f7cc5304 100644 --- a/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/regressions/prisma_15607.rs +++ b/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/regressions/prisma_15607.rs @@ -4,6 +4,7 @@ //! actors to allow test to continue even if one query is blocking. use indoc::indoc; +use query_engine_metrics::{MetricRecorder, WithMetricsInstrumentation}; use query_engine_tests::{ query_core::TxId, render_test_datamodel, setup_metrics, test_tracing_subscriber, LogEmit, QueryResult, Runner, TestError, TestLogCapture, TestResult, WithSubscriber, CONFIG, ENV_LOG_LEVEL, @@ -50,13 +51,12 @@ impl Actor { /// Spawns a new query engine to the runtime. pub async fn spawn() -> TestResult { let (log_capture, log_tx) = TestLogCapture::new(); - async fn with_logs(fut: impl Future, log_tx: LogEmit) -> T { - fut.with_subscriber(test_tracing_subscriber( - ENV_LOG_LEVEL.to_string(), - setup_metrics(), - log_tx, - )) - .await + let (metrics, recorder) = setup_metrics(); + + async fn with_observability(fut: impl Future, log_tx: LogEmit, recorder: MetricRecorder) -> T { + fut.with_subscriber(test_tracing_subscriber(ENV_LOG_LEVEL.to_string(), log_tx)) + .with_recorder(recorder) + .await } let (query_sender, mut query_receiver) = mpsc::channel(100); @@ -73,21 +73,24 @@ impl Actor { Some("READ COMMITTED"), ); - let mut runner = Runner::load(datamodel, &[], version, tag, None, setup_metrics(), log_capture).await?; + let mut runner = Runner::load(datamodel, &[], version, tag, None, metrics, log_capture).await?; tokio::spawn(async move { while let Some(message) = query_receiver.recv().await { match message { Message::Query(query) => { - let result = with_logs(runner.query(query), log_tx.clone()).await; + let result = with_observability(runner.query(query), log_tx.clone(), recorder.clone()).await; response_sender.send(Response::Query(result)).await.unwrap(); } Message::BeginTransaction => { - let response = with_logs(runner.start_tx(10000, 10000, None), log_tx.clone()).await; + let response = + with_observability(runner.start_tx(10000, 10000, None), log_tx.clone(), recorder.clone()) + .await; response_sender.send(Response::Tx(response)).await.unwrap(); } Message::RollbackTransaction(tx_id) => { - let response = with_logs(runner.rollback_tx(tx_id), log_tx.clone()).await?; + let response = + with_observability(runner.rollback_tx(tx_id), log_tx.clone(), recorder.clone()).await?; response_sender.send(Response::Rollback(response)).await.unwrap(); } Message::SetActiveTx(tx_id) => { diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs index e94c14c6c574..c62a099cf31e 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs @@ -23,9 +23,8 @@ pub use templating::*; use colored::Colorize; use once_cell::sync::Lazy; use psl::datamodel_connector::ConnectorCapabilities; -use query_engine_metrics::MetricRegistry; +use query_engine_metrics::{MetricRecorder, MetricRegistry, WithMetricsInstrumentation}; use std::future::Future; -use std::sync::Once; use tokio::runtime::Builder; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tracing_futures::WithSubscriber; @@ -61,14 +60,10 @@ fn run_with_tokio>(fut: F) -> O { .block_on(fut) } -static METRIC_RECORDER: Once = Once::new(); - -pub fn setup_metrics() -> MetricRegistry { +pub fn setup_metrics() -> (MetricRegistry, MetricRecorder) { let metrics = MetricRegistry::new(); - METRIC_RECORDER.call_once(|| { - query_engine_metrics::setup(); - }); - metrics + let recorder = MetricRecorder::new(metrics.clone()).with_initialized_prisma_metrics(); + (metrics, recorder) } /// Taken from Reddit. Enables taking an async function pointer which takes references as param @@ -161,8 +156,7 @@ fn run_relation_link_test_impl( let datamodel = render_test_datamodel(&test_db_name, template, &[], None, Default::default(), Default::default(), None); let (connector_tag, version) = CONFIG.test_connector().unwrap(); - let metrics = setup_metrics(); - let metrics_for_subscriber = metrics.clone(); + let (metrics, recorder) = setup_metrics(); let (log_capture, log_tx) = TestLogCapture::new(); run_with_tokio( @@ -176,9 +170,8 @@ fn run_relation_link_test_impl( test_fn(&runner, &dm).with_subscriber(test_tracing_subscriber( ENV_LOG_LEVEL.to_string(), - metrics_for_subscriber, log_tx, - )) + )).with_recorder(recorder) .await.unwrap(); teardown_project(&datamodel, Default::default(), runner.schema_id()) @@ -275,8 +268,7 @@ fn run_connector_test_impl( None, ); let (connector_tag, version) = CONFIG.test_connector().unwrap(); - let metrics = crate::setup_metrics(); - let metrics_for_subscriber = metrics.clone(); + let (metrics, recorder) = crate::setup_metrics(); let (log_capture, log_tx) = TestLogCapture::new(); @@ -297,11 +289,8 @@ fn run_connector_test_impl( let schema_id = runner.schema_id(); if let Err(err) = test_fn(runner) - .with_subscriber(test_tracing_subscriber( - ENV_LOG_LEVEL.to_string(), - metrics_for_subscriber, - log_tx, - )) + .with_subscriber(test_tracing_subscriber(ENV_LOG_LEVEL.to_string(), log_tx)) + .with_recorder(recorder) .await { panic!("💥 Test failed due to an error: {err:?}"); diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/logging.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/logging.rs index 5520075e6d30..1b6c73d23481 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/logging.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/logging.rs @@ -1,12 +1,11 @@ use query_core::telemetry::helpers as telemetry_helpers; -use query_engine_metrics::MetricRegistry; use tracing::Subscriber; use tracing_error::ErrorLayer; use tracing_subscriber::{prelude::*, Layer}; use crate::LogEmit; -pub fn test_tracing_subscriber(log_config: String, metrics: MetricRegistry, log_tx: LogEmit) -> impl Subscriber { +pub fn test_tracing_subscriber(log_config: String, log_tx: LogEmit) -> impl Subscriber { let filter = telemetry_helpers::env_filter(true, telemetry_helpers::QueryEngineLogLevel::Override(log_config)); let fmt_layer = tracing_subscriber::fmt::layer() @@ -15,7 +14,6 @@ pub fn test_tracing_subscriber(log_config: String, metrics: MetricRegistry, log_ tracing_subscriber::registry() .with(fmt_layer.boxed()) - .with(metrics.boxed()) .with(ErrorLayer::default()) } diff --git a/query-engine/connectors/mongodb-query-connector/Cargo.toml b/query-engine/connectors/mongodb-query-connector/Cargo.toml index 386c694b2679..413b95251d8e 100644 --- a/query-engine/connectors/mongodb-query-connector/Cargo.toml +++ b/query-engine/connectors/mongodb-query-connector/Cargo.toml @@ -7,7 +7,7 @@ version = "0.1.0" anyhow = "1.0" async-trait.workspace = true bigdecimal = "0.3" -futures = "0.3" +futures.workspace = true itertools.workspace = true mongodb.workspace = true bson.workspace = true diff --git a/query-engine/connectors/query-connector/Cargo.toml b/query-engine/connectors/query-connector/Cargo.toml index 52555d256baa..fe527241b646 100644 --- a/query-engine/connectors/query-connector/Cargo.toml +++ b/query-engine/connectors/query-connector/Cargo.toml @@ -7,7 +7,7 @@ version = "0.1.0" anyhow = "1.0" async-trait.workspace = true chrono.workspace = true -futures = "0.3" +futures.workspace = true itertools.workspace = true query-structure = {path = "../../query-structure"} prisma-value = {path = "../../../libs/prisma-value"} diff --git a/query-engine/connectors/sql-query-connector/Cargo.toml b/query-engine/connectors/sql-query-connector/Cargo.toml index 1826cac2681f..1d495858e6ac 100644 --- a/query-engine/connectors/sql-query-connector/Cargo.toml +++ b/query-engine/connectors/sql-query-connector/Cargo.toml @@ -38,7 +38,7 @@ psl.workspace = true anyhow = "1.0" async-trait.workspace = true bigdecimal = "0.3" -futures = "0.3" +futures.workspace = true itertools.workspace = true once_cell = "1.3" rand.workspace = true diff --git a/query-engine/core/Cargo.toml b/query-engine/core/Cargo.toml index a1aa9a326f77..f348fa8cdba1 100644 --- a/query-engine/core/Cargo.toml +++ b/query-engine/core/Cargo.toml @@ -15,7 +15,7 @@ connection-string.workspace = true connector = { path = "../connectors/query-connector", package = "query-connector" } crossbeam-channel = "0.5.6" psl.workspace = true -futures = "0.3" +futures.workspace = true indexmap.workspace = true itertools.workspace = true once_cell = "1" diff --git a/query-engine/core/src/executor/execute_operation.rs b/query-engine/core/src/executor/execute_operation.rs index 627c6ed19439..bf1293e70895 100644 --- a/query-engine/core/src/executor/execute_operation.rs +++ b/query-engine/core/src/executor/execute_operation.rs @@ -10,9 +10,12 @@ use connector::{Connection, ConnectionLike, Connector}; use crosstarget_utils::time::ElapsedTimeCounter; use futures::future; +#[cfg(not(feature = "metrics"))] +use crate::metrics::MetricsInstrumentationStub; #[cfg(feature = "metrics")] use query_engine_metrics::{ - counter, histogram, PRISMA_CLIENT_QUERIES_DURATION_HISTOGRAM_MS, PRISMA_CLIENT_QUERIES_TOTAL, + counter, histogram, WithMetricsInstrumentation, PRISMA_CLIENT_QUERIES_DURATION_HISTOGRAM_MS, + PRISMA_CLIENT_QUERIES_TOTAL, }; use schema::{QuerySchema, QuerySchemaRef}; @@ -106,7 +109,6 @@ pub async fn execute_many_self_contained( ) -> crate::Result>> { let mut futures = Vec::with_capacity(operations.len()); - let dispatcher = crate::get_current_dispatcher(); for op in operations { #[cfg(feature = "metrics")] counter!(PRISMA_CLIENT_QUERIES_TOTAL).increment(1); @@ -130,7 +132,8 @@ pub async fn execute_many_self_contained( trace_id.clone(), ), ) - .with_subscriber(dispatcher.clone()), + .with_current_subscriber() + .with_current_recorder(), )); } diff --git a/query-engine/core/src/executor/mod.rs b/query-engine/core/src/executor/mod.rs index fee7bc68fe7b..6fe5ee470222 100644 --- a/query-engine/core/src/executor/mod.rs +++ b/query-engine/core/src/executor/mod.rs @@ -22,7 +22,6 @@ use crate::{ use async_trait::async_trait; use connector::Connector; use serde::{Deserialize, Serialize}; -use tracing::Dispatch; #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] @@ -116,20 +115,3 @@ pub trait TransactionManager { /// Rolls back a transaction. async fn rollback_tx(&self, tx_id: TxId) -> crate::Result<()>; } - -// With the node-api when a future is spawned in a new thread `tokio:spawn` it will not -// use the current dispatcher and its logs will not be captured anymore. We can use this -// method to get the current dispatcher and combine it with `with_subscriber` -// let dispatcher = get_current_dispatcher(); -// tokio::spawn(async { -// my_async_ops.await -// }.with_subscriber(dispatcher)); -// -// -// Finally, this can be replaced with with_current_collector -// https://github.com/tokio-rs/tracing/blob/master/tracing-futures/src/lib.rs#L234 -// once this is in a release - -pub fn get_current_dispatcher() -> Dispatch { - tracing::dispatcher::get_default(|current| current.clone()) -} diff --git a/query-engine/core/src/interactive_transactions/actors.rs b/query-engine/core/src/interactive_transactions/actors.rs index 86ebd5c13b84..601320d403a1 100644 --- a/query-engine/core/src/interactive_transactions/actors.rs +++ b/query-engine/core/src/interactive_transactions/actors.rs @@ -19,6 +19,11 @@ use tracing::Span; use tracing_futures::Instrument; use tracing_futures::WithSubscriber; +#[cfg(not(feature = "metrics"))] +use crate::metrics::MetricsInstrumentationStub; +#[cfg(feature = "metrics")] +use query_engine_metrics::WithMetricsInstrumentation; + #[cfg(feature = "metrics")] use crate::telemetry::helpers::set_span_link_from_traceparent; @@ -264,7 +269,6 @@ pub(crate) async fn spawn_itx_actor( let span = Span::current(); let tx_id_str = tx_id.to_string(); span.record("itx_id", tx_id_str.as_str()); - let dispatcher = crate::get_current_dispatcher(); let (tx_to_server, rx_from_client) = channel::(channel_size); let client = ITXClient { @@ -334,7 +338,8 @@ pub(crate) async fn spawn_itx_actor( trace!("[{}] has stopped with {}", server.id.to_string(), server.cached_tx); }) .instrument(span) - .with_subscriber(dispatcher), + .with_current_subscriber() + .with_current_recorder(), ); open_transaction_rcv.await.unwrap()?; diff --git a/query-engine/core/src/lib.rs b/query-engine/core/src/lib.rs index ae6a437bc083..25a0fcbd763e 100644 --- a/query-engine/core/src/lib.rs +++ b/query-engine/core/src/lib.rs @@ -28,6 +28,7 @@ pub use connector::{ mod error; mod interactive_transactions; mod interpreter; +mod metrics; mod query_ast; mod query_graph; mod result_ast; diff --git a/query-engine/core/src/metrics.rs b/query-engine/core/src/metrics.rs new file mode 100644 index 000000000000..fbbd62685dcc --- /dev/null +++ b/query-engine/core/src/metrics.rs @@ -0,0 +1,13 @@ +/// When the `metrics` feature is disabled, we don't compile the `query-engine-metrics` crate and +/// thus can't use the metrics instrumentation. To avoid the boilerplate of putting every +/// `with_current_recorder` call behind `#[cfg]`, we use this stub trait that does nothing but +/// allows the code that relies on `WithMetricsInstrumentation` trait to be in scope compile. +#[cfg(not(feature = "metrics"))] +pub(crate) trait MetricsInstrumentationStub: Sized { + fn with_current_recorder(self) -> Self { + self + } +} + +#[cfg(not(feature = "metrics"))] +impl MetricsInstrumentationStub for T {} diff --git a/query-engine/core/src/telemetry/capturing/mod.rs b/query-engine/core/src/telemetry/capturing/mod.rs index fc1219d5fe06..99f92b734637 100644 --- a/query-engine/core/src/telemetry/capturing/mod.rs +++ b/query-engine/core/src/telemetry/capturing/mod.rs @@ -145,7 +145,10 @@ use once_cell::sync::Lazy; use opentelemetry::{global, sdk, trace}; use tracing::subscriber; use tracing_subscriber::{ - filter::filter_fn, layer::Layered, prelude::__tracing_subscriber_SubscriberExt, Layer, Registry, + filter::filter_fn, + layer::{Layered, SubscriberExt}, + registry::LookupSpan, + Layer, Registry, }; static PROCESSOR: Lazy = Lazy::new(Processor::default); @@ -161,10 +164,7 @@ pub fn capturer(trace_id: trace::TraceId, settings: Settings) -> Capturer { #[cfg(feature = "metrics")] #[allow(clippy::type_complexity)] pub fn install_capturing_layer( - subscriber: Layered< - Option, - Layered + Send + Sync>, Registry>, - >, + subscriber: impl SubscriberExt + for<'a> LookupSpan<'a> + Send + Sync + 'static, log_queries: bool, ) { // set a trace context propagator, so that the trace context is propagated via the diff --git a/query-engine/driver-adapters/Cargo.toml b/query-engine/driver-adapters/Cargo.toml index 767780ff65db..a17e6808947a 100644 --- a/query-engine/driver-adapters/Cargo.toml +++ b/query-engine/driver-adapters/Cargo.toml @@ -10,6 +10,7 @@ postgresql = ["quaint/postgresql"] [dependencies] async-trait.workspace = true +futures.workspace = true once_cell = "1.15" serde.workspace = true serde_json.workspace = true @@ -17,11 +18,9 @@ tracing.workspace = true tracing-core = "0.1" metrics.workspace = true uuid.workspace = true -pin-project = "1" +pin-project.workspace = true serde_repr.workspace = true -futures = "0.3" - [dev-dependencies] expect-test = "1" tokio = { version = "1", features = ["macros", "time", "sync"] } diff --git a/query-engine/metrics/Cargo.toml b/query-engine/metrics/Cargo.toml index 0abe32ac8455..c21ac6636e81 100644 --- a/query-engine/metrics/Cargo.toml +++ b/query-engine/metrics/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" edition = "2021" [dependencies] +futures.workspace = true +derive_more.workspace = true metrics.workspace = true metrics-util = "0.17.0" metrics-exporter-prometheus = { version = "0.15.3", default-features = false } @@ -14,6 +16,7 @@ tracing.workspace = true tracing-futures = "0.2" tracing-subscriber = "0.3.11" parking_lot = "0.12" +pin-project.workspace = true [dev-dependencies] expect-test = "1" diff --git a/query-engine/metrics/src/instrument.rs b/query-engine/metrics/src/instrument.rs new file mode 100644 index 000000000000..a2cb16de48f8 --- /dev/null +++ b/query-engine/metrics/src/instrument.rs @@ -0,0 +1,83 @@ +use std::{ + cell::RefCell, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::future::Either; +use pin_project::pin_project; + +use crate::MetricRecorder; + +thread_local! { + /// The current metric recorder temporarily set on the current thread while polling a future. + /// + /// See the description of `GLOBAL_RECORDER` in [`crate::recorder`] module for more + /// information. + static CURRENT_RECORDER: RefCell> = const { RefCell::new(None) }; +} + +/// Instruments a type with a metrics recorder. +/// +/// The instrumentation logic is currently only implemented for futures, but it could be extended +/// to support streams, sinks, and other types later if needed. Right now we only need it to be +/// able to set the initial recorder in the Node-API engine methods and forward the recorder to +/// spawned tokio tasks; in other words, to instrument the top-level future of each task. +pub trait WithMetricsInstrumentation: Sized { + /// Instruments the type with a [`MetricRecorder`]. + fn with_recorder(self, recorder: MetricRecorder) -> WithRecorder { + WithRecorder { inner: self, recorder } + } + + /// Instruments the type with an [`MetricRecorder`] if it is a `Some` or returns `self` as is + /// if the `recorder` is a `None`. + fn with_optional_recorder(self, recorder: Option) -> Either, Self> { + match recorder { + Some(recorder) => Either::Left(self.with_recorder(recorder)), + None => Either::Right(self), + } + } + + /// Instruments the type with the current [`MetricRecorder`] from the parent context on this + /// thread, or the default global recorder otherwise. If neither is set, then `self` is + /// returned as is. + fn with_current_recorder(self) -> Either, Self> { + CURRENT_RECORDER.with_borrow(|recorder| { + let recorder = recorder.clone().or_else(crate::recorder::global_recorder); + self.with_optional_recorder(recorder) + }) + } +} + +impl WithMetricsInstrumentation for T {} + +/// A type instrumented with a metric recorder. +/// +/// If `T` is a `Future`, then `WithRecorder` is also a `Future`. When polled, it temporarily +/// sets the local metric recorder for the duration of polling the inner future, and then restores +/// the previous recorder on the stack. +/// +/// Similar logic can be implemented for cases where `T` is another async primitive like a stream +/// or a sink, or any other type where such instrumentation makes sense (e.g. a function). +#[pin_project] +pub struct WithRecorder { + #[pin] + inner: T, + recorder: MetricRecorder, +} + +impl Future for WithRecorder { + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let prev_recorder = CURRENT_RECORDER.replace(Some(this.recorder.clone())); + + let poll = metrics::with_local_recorder(this.recorder, || this.inner.poll(cx)); + + CURRENT_RECORDER.set(prev_recorder); + + poll + } +} diff --git a/query-engine/metrics/src/lib.rs b/query-engine/metrics/src/lib.rs index 9a041102c4b0..42dc4dbb2e48 100644 --- a/query-engine/metrics/src/lib.rs +++ b/query-engine/metrics/src/lib.rs @@ -19,26 +19,21 @@ //! * At the moment, with the Histogram we only support one type of bucket which is a bucket for timings in milliseconds. //! -const METRIC_TARGET: &str = "qe_metrics"; -const METRIC_COUNTER: &str = "counter"; -const METRIC_GAUGE: &str = "gauge"; -const METRIC_HISTOGRAM: &str = "histogram"; -const METRIC_DESCRIPTION: &str = "description"; - mod common; mod formatters; +mod instrument; mod recorder; mod registry; use once_cell::sync::Lazy; -use recorder::*; -pub use registry::MetricRegistry; use serde::Deserialize; use std::collections::HashMap; -use std::sync::Once; -pub extern crate metrics; -pub use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram}; +pub use metrics::{self, counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram}; + +pub use instrument::*; +pub use recorder::MetricRecorder; +pub use registry::MetricRegistry; // Metrics that we emit from the engines, third party metrics emitted by libraries and that we rename are omitted. pub const PRISMA_CLIENT_QUERIES_TOTAL: &str = "prisma_client_queries_total"; // counter @@ -91,19 +86,8 @@ static METRIC_RENAMES: Lazy> ]) }); -pub fn setup() { - set_recorder(); - initialize_metrics(); -} - -static METRIC_RECORDER: Once = Once::new(); - -fn set_recorder() { - METRIC_RECORDER.call_once(|| metrics::set_global_recorder(MetricRecorder).unwrap()); -} - /// Initialize metrics descriptions and values -pub fn initialize_metrics() { +pub(crate) fn initialize_metrics() { initialize_metrics_descriptions(); initialize_metrics_values(); } @@ -170,17 +154,12 @@ mod tests { use serde_json::json; use std::collections::HashMap; use std::time::Duration; - use tracing::instrument::WithSubscriber; - use tracing::{trace, Dispatch}; - use tracing_subscriber::layer::SubscriberExt; + use tracing::trace; use once_cell::sync::Lazy; use tokio::runtime::Runtime; - static RT: Lazy = Lazy::new(|| { - set_recorder(); - Runtime::new().unwrap() - }); + static RT: Lazy = Lazy::new(|| Runtime::new().unwrap()); const TESTING_ACCEPT_LIST: &[&str] = &[ "test_counter", @@ -201,8 +180,8 @@ mod tests { fn test_counters() { RT.block_on(async { let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec()); - let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone())); - async { + let recorder = MetricRecorder::new(metrics.clone()); + async move { let counter1 = counter!("test_counter"); counter1.increment(1); counter!("test_counter").increment(1); @@ -220,7 +199,7 @@ mod tests { let val3 = metrics.counter_value("test_counter").unwrap(); assert_eq!(val3, 5); } - .with_subscriber(dispatch) + .with_recorder(recorder) .await; }); } @@ -229,8 +208,8 @@ mod tests { fn test_gauges() { RT.block_on(async { let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec()); - let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone())); - async { + let recorder = MetricRecorder::new(metrics.clone()); + async move { let gauge1 = gauge!("test_gauge"); gauge1.increment(1.0); gauge!("test_gauge").increment(1.0); @@ -253,7 +232,7 @@ mod tests { let val4 = metrics.gauge_value("test_gauge").unwrap(); assert_eq!(val4, 3.0); } - .with_subscriber(dispatch) + .with_recorder(recorder) .await; }); } @@ -262,8 +241,8 @@ mod tests { fn test_no_panic_and_ignore_other_traces() { RT.block_on(async { let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec()); - let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone())); - async { + let recorder = MetricRecorder::new(metrics.clone()); + async move { trace!("a fake trace"); gauge!("test_gauge").set(1.0); @@ -274,7 +253,7 @@ mod tests { assert_eq!(1.0, metrics.gauge_value("test_gauge").unwrap()); assert_eq!(1, metrics.counter_value("test_counter").unwrap()); } - .with_subscriber(dispatch) + .with_recorder(recorder) .await; }); } @@ -283,15 +262,15 @@ mod tests { fn test_ignore_non_accepted_metrics() { RT.block_on(async { let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec()); - let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone())); - async { + let recorder = MetricRecorder::new(metrics.clone()); + async move { gauge!("not_accepted").set(1.0); gauge!("test_gauge").set(1.0); assert_eq!(1.0, metrics.gauge_value("test_gauge").unwrap()); assert_eq!(None, metrics.gauge_value("not_accepted")); } - .with_subscriber(dispatch) + .with_recorder(recorder) .await; }); } @@ -300,8 +279,8 @@ mod tests { fn test_histograms() { RT.block_on(async { let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec()); - let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone())); - async { + let recorder = MetricRecorder::new(metrics.clone()); + async move { let hist = histogram!("test_histogram"); hist.record(Duration::from_millis(9)); @@ -328,7 +307,7 @@ mod tests { assert_eq!(hist.buckets(), expected); } - .with_subscriber(dispatch) + .with_recorder(recorder) .await; }); } @@ -337,8 +316,8 @@ mod tests { fn test_set_and_read_descriptions() { RT.block_on(async { let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec()); - let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone())); - async { + let recorder = MetricRecorder::new(metrics.clone()); + async move { describe_counter!("test_counter", "This is a counter"); let descriptions = metrics.get_descriptions(); @@ -359,7 +338,7 @@ mod tests { let description = descriptions.get("test_histogram").unwrap(); assert_eq!("This is a hist", description); } - .with_subscriber(dispatch) + .with_recorder(recorder) .await; }); } @@ -368,8 +347,8 @@ mod tests { fn test_to_json() { RT.block_on(async { let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec()); - let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone())); - async { + let recorder = MetricRecorder::new(metrics.clone()); + async move { let empty = json!({ "counters": [], "gauges": [], @@ -440,7 +419,7 @@ mod tests { assert_eq!(json, expected); } - .with_subscriber(dispatch) + .with_recorder(recorder) .await; }); } @@ -449,8 +428,8 @@ mod tests { fn test_global_and_metric_labels() { RT.block_on(async { let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec()); - let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone())); - async { + let recorder = MetricRecorder::new(metrics.clone()); + async move { let hist = histogram!("test_histogram", "label" => "one", "two" => "another"); hist.record(Duration::from_millis(9)); @@ -483,7 +462,7 @@ mod tests { }); assert_eq!(expected, json); } - .with_subscriber(dispatch) + .with_recorder(recorder) .await; }); } @@ -492,8 +471,8 @@ mod tests { fn test_prometheus_format() { RT.block_on(async { let metrics = MetricRegistry::new_with_accept_list(TESTING_ACCEPT_LIST.to_vec()); - let dispatch = Dispatch::new(tracing_subscriber::Registry::default().with(metrics.clone())); - async { + let recorder = MetricRecorder::new(metrics.clone()); + async move { counter!("counter_1", "label" => "one").absolute(4); describe_counter!("counter_2", "this is a description for counter 2"); counter!("counter_2", "label" => "one", "another_label" => "two").absolute(2); @@ -566,7 +545,7 @@ mod tests { snapshot.assert_eq(&prometheus); } - .with_subscriber(dispatch) + .with_recorder(recorder) .await; }); } diff --git a/query-engine/metrics/src/recorder.rs b/query-engine/metrics/src/recorder.rs index 25a71967f6a0..a911a45d67e5 100644 --- a/query-engine/metrics/src/recorder.rs +++ b/query-engine/metrics/src/recorder.rs @@ -1,122 +1,190 @@ -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; +use derive_more::Display; use metrics::{Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn, Key, Recorder, Unit}; use metrics::{KeyName, Metadata, SharedString}; -use tracing::trace; -use super::common::KeyLabels; -use super::{METRIC_COUNTER, METRIC_DESCRIPTION, METRIC_GAUGE, METRIC_HISTOGRAM, METRIC_TARGET}; +use crate::common::{MetricAction, MetricType}; +use crate::registry::MetricVisitor; +use crate::MetricRegistry; + +/// Default global metric recorder. +/// +/// `metrics` crate has the state on its own. It allows setting the global recorder, it allows +/// overriding it for a duration of an async closure, and it allows borrowing the current recorder +/// for a short while. We, however, can't use this in our async instrumentation because we need the +/// current recorder to be `Send + 'static` to be able to store it in a future that would be usable +/// in a work-stealing runtime, especially since we need to be able to instrument the futures +/// spawned as tasks. The solution to this is to maintain our own state in parallel. +/// +/// The APIs exposed by the crate guarantee that the state we modify on our side is updated on the +/// `metrics` side as well. Using `metrics::set_global_recorder` or `metrics::with_local_recorder` +/// in user code won't be detected by us but is safe and won't lead to any issues (even if the new +/// recorder isn't the [`MetricRecorder`] from this crate), we just won't know about any new local +/// recorders on the stack, and calling +/// [`crate::WithMetricsInstrumentation::with_current_recorder`] will re-use the last +/// [`MetricRecorder`] known to us. +static GLOBAL_RECORDER: OnceLock> = const { OnceLock::new() }; + +#[derive(Display, Debug)] +#[display(fmt = "global recorder can only be installed once")] +pub struct AlreadyInstalled; + +impl std::error::Error for AlreadyInstalled {} + +fn set_global_recorder(recorder: MetricRecorder) -> Result<(), AlreadyInstalled> { + GLOBAL_RECORDER.set(Some(recorder)).map_err(|_| AlreadyInstalled) +} -#[derive(Default)] -pub(crate) struct MetricRecorder; +pub(crate) fn global_recorder() -> Option { + GLOBAL_RECORDER.get()?.clone() +} + +/// Receives the metrics from the macros provided by the `metrics` crate and forwards them to +/// [`MetricRegistry`]. +/// +/// To provide an analogy, `MetricRecorder` to `MetricRegistry` is what `Dispatch` is to +/// `Subscriber` in `tracing`. Just like `Dispatch`, it acts like a handle to the registry and is +/// cheaply clonable with reference-counting semantics. +#[derive(Clone)] +pub struct MetricRecorder { + registry: MetricRegistry, +} impl MetricRecorder { - fn register_description(&self, name: &str, description: &str) { - trace!( - target: METRIC_TARGET, - name = name, - metric_type = METRIC_DESCRIPTION, - description = description - ); + pub fn new(registry: MetricRegistry) -> Self { + Self { registry } + } + + /// Convenience method to call [`Self::init_prisma_metrics`] immediately after creating the + /// recorder. + pub fn with_initialized_prisma_metrics(self) -> Self { + self.init_prisma_metrics(); + self + } + + /// Initializes the default Prisma metrics by dispatching their descriptions and initial values + /// to the registry. + /// + /// Query engine needs this, but the metrics can also be used without this, especially in + /// tests. + pub fn init_prisma_metrics(&self) { + metrics::with_local_recorder(self, || { + super::initialize_metrics(); + }); + } + + /// Installs the metrics recorder globally, registering it both with the `metrics` crate and + /// our own instrumentation. + pub fn install_globally(&self) -> Result<(), AlreadyInstalled> { + set_global_recorder(self.clone())?; + metrics::set_global_recorder(self.clone()).map_err(|_| AlreadyInstalled) + } + + fn register_description(&self, name: KeyName, description: &str) { + self.record_in_registry(&MetricVisitor { + metric_type: MetricType::Description, + action: MetricAction::Description(description.to_owned()), + name: Key::from_name(name), + }); + } + + fn record_in_registry(&self, visitor: &MetricVisitor) { + self.registry.record(visitor); } } impl Recorder for MetricRecorder { fn describe_counter(&self, key_name: KeyName, _unit: Option, description: SharedString) { - self.register_description(key_name.as_str(), &description); + self.register_description(key_name, &description); } fn describe_gauge(&self, key_name: KeyName, _unit: Option, description: SharedString) { - self.register_description(key_name.as_str(), &description); + self.register_description(key_name, &description); } fn describe_histogram(&self, key_name: KeyName, _unit: Option, description: SharedString) { - self.register_description(key_name.as_str(), &description); + self.register_description(key_name, &description); } fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter { - Counter::from_arc(Arc::new(MetricHandle(key.clone()))) + Counter::from_arc(Arc::new(MetricHandle::new(key.clone(), self.registry.clone()))) } fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge { - Gauge::from_arc(Arc::new(MetricHandle(key.clone()))) + Gauge::from_arc(Arc::new(MetricHandle::new(key.clone(), self.registry.clone()))) } fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram { - Histogram::from_arc(Arc::new(MetricHandle(key.clone()))) + Histogram::from_arc(Arc::new(MetricHandle::new(key.clone(), self.registry.clone()))) } } -pub(crate) struct MetricHandle(Key); +pub(crate) struct MetricHandle { + key: Key, + registry: MetricRegistry, +} + +impl MetricHandle { + pub fn new(key: Key, registry: MetricRegistry) -> Self { + Self { key, registry } + } + + fn record_in_registry(&self, visitor: &MetricVisitor) { + self.registry.record(visitor); + } +} impl CounterFn for MetricHandle { fn increment(&self, value: u64) { - let keylabels: KeyLabels = self.0.clone().into(); - let json_string = serde_json::to_string(&keylabels).unwrap(); - trace!( - target: METRIC_TARGET, - key_labels = json_string.as_str(), - metric_type = METRIC_COUNTER, - increment = value, - ); + self.record_in_registry(&MetricVisitor { + metric_type: MetricType::Counter, + action: MetricAction::Increment(value), + name: self.key.clone(), + }); } fn absolute(&self, value: u64) { - let keylabels: KeyLabels = self.0.clone().into(); - let json_string = serde_json::to_string(&keylabels).unwrap(); - trace!( - target: METRIC_TARGET, - key_labels = json_string.as_str(), - metric_type = METRIC_COUNTER, - absolute = value, - ); + self.record_in_registry(&MetricVisitor { + metric_type: MetricType::Counter, + action: MetricAction::Absolute(value), + name: self.key.clone(), + }); } } impl GaugeFn for MetricHandle { fn increment(&self, value: f64) { - let keylabels: KeyLabels = self.0.clone().into(); - let json_string = serde_json::to_string(&keylabels).unwrap(); - trace!( - target: METRIC_TARGET, - key_labels = json_string.as_str(), - metric_type = METRIC_GAUGE, - gauge_inc = value, - ); + self.record_in_registry(&MetricVisitor { + metric_type: MetricType::Gauge, + action: MetricAction::GaugeInc(value), + name: self.key.clone(), + }); } fn decrement(&self, value: f64) { - let keylabels: KeyLabels = self.0.clone().into(); - let json_string = serde_json::to_string(&keylabels).unwrap(); - trace!( - target: METRIC_TARGET, - key_labels = json_string.as_str(), - metric_type = METRIC_GAUGE, - gauge_dec = value, - ); + self.record_in_registry(&MetricVisitor { + metric_type: MetricType::Gauge, + action: MetricAction::GaugeDec(value), + name: self.key.clone(), + }); } fn set(&self, value: f64) { - let keylabels: KeyLabels = self.0.clone().into(); - let json_string = serde_json::to_string(&keylabels).unwrap(); - trace!( - target: METRIC_TARGET, - key_labels = json_string.as_str(), - metric_type = METRIC_GAUGE, - gauge_set = value, - ); + self.record_in_registry(&MetricVisitor { + metric_type: MetricType::Gauge, + action: MetricAction::GaugeSet(value), + name: self.key.clone(), + }); } } impl HistogramFn for MetricHandle { fn record(&self, value: f64) { - let keylabels: KeyLabels = self.0.clone().into(); - let json_string = serde_json::to_string(&keylabels).unwrap(); - trace!( - target: METRIC_TARGET, - key_labels = json_string.as_str(), - metric_type = METRIC_HISTOGRAM, - hist_record = value, - ); + self.record_in_registry(&MetricVisitor { + metric_type: MetricType::Histogram, + action: MetricAction::HistRecord(value), + name: self.key.clone(), + }); } } diff --git a/query-engine/metrics/src/registry.rs b/query-engine/metrics/src/registry.rs index 6530edbe8764..f6b217fcda56 100644 --- a/query-engine/metrics/src/registry.rs +++ b/query-engine/metrics/src/registry.rs @@ -1,11 +1,7 @@ -use super::formatters::metrics_to_json; -use super::{ - common::{KeyLabels, Metric, MetricAction, MetricType, MetricValue, Snapshot}, - formatters::metrics_to_prometheus, -}; -use super::{ - ACCEPT_LIST, HISTOGRAM_BOUNDS, METRIC_COUNTER, METRIC_DESCRIPTION, METRIC_GAUGE, METRIC_HISTOGRAM, METRIC_TARGET, -}; +use std::collections::HashMap; +use std::fmt; +use std::sync::{atomic::Ordering, Arc}; + use metrics::{CounterFn, GaugeFn, HistogramFn, Key}; use metrics_util::{ registry::{GenerationalAtomicStorage, GenerationalStorage, Registry}, @@ -13,15 +9,13 @@ use metrics_util::{ }; use parking_lot::RwLock; use serde_json::Value; -use std::collections::HashMap; -use std::fmt; -use std::sync::atomic::Ordering; -use std::sync::Arc; -use tracing::{ - field::{Field, Visit}, - Subscriber, + +use super::formatters::metrics_to_json; +use super::{ + common::{Metric, MetricAction, MetricType, MetricValue, Snapshot}, + formatters::metrics_to_prometheus, }; -use tracing_subscriber::Layer; +use super::{ACCEPT_LIST, HISTOGRAM_BOUNDS}; struct Inner { descriptions: RwLock>, @@ -46,7 +40,7 @@ pub struct MetricRegistry { impl fmt::Debug for MetricRegistry { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Metric Registry") + write!(f, "MetricRegistry {{ .. }}") } } @@ -68,12 +62,14 @@ impl MetricRegistry { } } - fn record(&self, metric: &MetricVisitor) { - match metric.metric_type { - MetricType::Counter => self.handle_counter(metric), - MetricType::Gauge => self.handle_gauge(metric), - MetricType::Histogram => self.handle_histogram(metric), - MetricType::Description => self.handle_description(metric), + pub(crate) fn record(&self, metric: &MetricVisitor) { + if self.is_accepted_metric(metric) { + match metric.metric_type { + MetricType::Counter => self.handle_counter(metric), + MetricType::Gauge => self.handle_gauge(metric), + MetricType::Histogram => self.handle_histogram(metric), + MetricType::Description => self.handle_description(metric), + } } } @@ -223,80 +219,8 @@ impl MetricRegistry { } #[derive(Debug)] -struct MetricVisitor { - metric_type: MetricType, - action: MetricAction, - name: Key, -} - -impl MetricVisitor { - pub fn new() -> Self { - Self { - metric_type: MetricType::Description, - action: MetricAction::Absolute(0), - name: Key::from_name(""), - } - } -} - -impl Visit for MetricVisitor { - fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {} - - fn record_f64(&mut self, field: &Field, value: f64) { - match field.name() { - "gauge_inc" => self.action = MetricAction::GaugeInc(value), - "gauge_dec" => self.action = MetricAction::GaugeDec(value), - "gauge_set" => self.action = MetricAction::GaugeSet(value), - "hist_record" => self.action = MetricAction::HistRecord(value), - _ => (), - } - } - - fn record_i64(&mut self, field: &Field, value: i64) { - match field.name() { - "increment" => self.action = MetricAction::Increment(value as u64), - "absolute" => self.action = MetricAction::Absolute(value as u64), - _ => (), - } - } - - fn record_u64(&mut self, field: &Field, value: u64) { - match field.name() { - "increment" => self.action = MetricAction::Increment(value), - "absolute" => self.action = MetricAction::Absolute(value), - _ => (), - } - } - - fn record_str(&mut self, field: &Field, value: &str) { - match (field.name(), value) { - ("metric_type", METRIC_COUNTER) => self.metric_type = MetricType::Counter, - ("metric_type", METRIC_GAUGE) => self.metric_type = MetricType::Gauge, - ("metric_type", METRIC_HISTOGRAM) => self.metric_type = MetricType::Histogram, - ("metric_type", METRIC_DESCRIPTION) => self.metric_type = MetricType::Description, - ("name", _) => self.name = Key::from_name(value.to_string()), - ("key_labels", _) => { - let key_labels: KeyLabels = serde_json::from_str(value).unwrap(); - self.name = key_labels.into(); - } - (METRIC_DESCRIPTION, _) => self.action = MetricAction::Description(value.to_string()), - _ => (), - } - } -} - -// A tracing layer for receiving metric trace events and storing them in the registry. -impl Layer for MetricRegistry { - fn on_event(&self, event: &tracing::Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) { - if event.metadata().target() != METRIC_TARGET { - return; - } - - let mut visitor = MetricVisitor::new(); - event.record(&mut visitor); - - if self.is_accepted_metric(&visitor) { - self.record(&visitor); - } - } +pub(crate) struct MetricVisitor { + pub(crate) metric_type: MetricType, + pub(crate) action: MetricAction, + pub(crate) name: Key, } diff --git a/query-engine/query-engine-c-abi/Cargo.toml b/query-engine/query-engine-c-abi/Cargo.toml index ff55be35b8e9..8f6d05ffe51f 100644 --- a/query-engine/query-engine-c-abi/Cargo.toml +++ b/query-engine/query-engine-c-abi/Cargo.toml @@ -41,7 +41,7 @@ tracing-opentelemetry = "0.17.3" opentelemetry = { version = "0.17" } tokio.workspace = true -futures = "0.3" +futures.workspace = true once_cell = "1.19.0" [build-dependencies] diff --git a/query-engine/query-engine-node-api/Cargo.toml b/query-engine/query-engine-node-api/Cargo.toml index b7b3db9ad5a1..73eafbb626bc 100644 --- a/query-engine/query-engine-node-api/Cargo.toml +++ b/query-engine/query-engine-node-api/Cargo.toml @@ -51,7 +51,7 @@ opentelemetry = { version = "0.17" } quaint.workspace = true tokio.workspace = true -futures = "0.3" +futures.workspace = true query-engine-metrics = { path = "../metrics" } [build-dependencies] diff --git a/query-engine/query-engine-node-api/src/engine.rs b/query-engine/query-engine-node-api/src/engine.rs index d03e78f6c298..6508c04c372f 100644 --- a/query-engine/query-engine-node-api/src/engine.rs +++ b/query-engine/query-engine-node-api/src/engine.rs @@ -9,7 +9,7 @@ use query_engine_common::engine::{ map_known_error, stringify_env_values, ConnectedEngine, ConnectedEngineNative, ConstructorOptions, ConstructorOptionsNative, EngineBuilder, EngineBuilderNative, Inner, }; -use query_engine_metrics::MetricFormat; +use query_engine_metrics::{MetricFormat, WithMetricsInstrumentation}; use request_handlers::{load_executor, render_graphql_schema, ConnectorKind, RequestBody, RequestHandler}; use serde::Deserialize; use serde_json::json; @@ -149,21 +149,6 @@ impl QueryEngine { let log_level = log_level.parse::().unwrap(); let logger = Logger::new(log_queries, log_level, log_callback, enable_metrics, enable_tracing); - // Describe metrics adds all the descriptions and default values for our metrics - // this needs to run once our metrics pipeline has been configured and it needs to - // use the correct logging subscriber(our dispatch) so that the metrics recorder recieves - // it - if enable_metrics { - napi_env.execute_tokio_future( - async { - query_engine_metrics::initialize_metrics(); - Ok(()) - } - .with_subscriber(logger.dispatcher()), - |&mut _env, _data| Ok(()), - )?; - } - Ok(Self { connector_mode, inner: RwLock::new(Inner::Builder(builder)), @@ -175,6 +160,7 @@ impl QueryEngine { #[napi] pub async fn connect(&self, trace: String) -> napi::Result<()> { let dispatcher = self.logger.dispatcher(); + let recorder = self.logger.recorder(); async_panic_to_js_error(async { let span = tracing::info_span!("prisma:engine:connect"); @@ -268,6 +254,7 @@ impl QueryEngine { Ok(()) }) .with_subscriber(dispatcher) + .with_optional_recorder(recorder) .await?; Ok(()) @@ -277,6 +264,7 @@ impl QueryEngine { #[napi] pub async fn disconnect(&self, trace: String) -> napi::Result<()> { let dispatcher = self.logger.dispatcher(); + let recorder = self.logger.recorder(); async_panic_to_js_error(async { let span = tracing::info_span!("prisma:engine:disconnect"); @@ -305,6 +293,7 @@ impl QueryEngine { .await }) .with_subscriber(dispatcher) + .with_optional_recorder(recorder) .await } @@ -312,6 +301,7 @@ impl QueryEngine { #[napi] pub async fn query(&self, body: String, trace: String, tx_id: Option) -> napi::Result { let dispatcher = self.logger.dispatcher(); + let recorder = self.logger.recorder(); async_panic_to_js_error(async { let inner = self.inner.read().await; @@ -338,6 +328,7 @@ impl QueryEngine { .await }) .with_subscriber(dispatcher) + .with_optional_recorder(recorder) .await } @@ -349,6 +340,7 @@ impl QueryEngine { let engine = inner.as_engine()?; let dispatcher = self.logger.dispatcher(); + let recorder = self.logger.recorder(); async move { let span = tracing::info_span!("prisma:engine:itx_runner", user_facing = true, itx_id = field::Empty); @@ -366,6 +358,7 @@ impl QueryEngine { } } .with_subscriber(dispatcher) + .with_optional_recorder(recorder) .await }) .await @@ -379,6 +372,7 @@ impl QueryEngine { let engine = inner.as_engine()?; let dispatcher = self.logger.dispatcher(); + let recorder = self.logger.recorder(); async move { match engine.executor().commit_tx(TxId::from(tx_id)).await { @@ -387,6 +381,7 @@ impl QueryEngine { } } .with_subscriber(dispatcher) + .with_optional_recorder(recorder) .await }) .await @@ -400,6 +395,7 @@ impl QueryEngine { let engine = inner.as_engine()?; let dispatcher = self.logger.dispatcher(); + let recorder = self.logger.recorder(); async move { match engine.executor().rollback_tx(TxId::from(tx_id)).await { @@ -408,6 +404,7 @@ impl QueryEngine { } } .with_subscriber(dispatcher) + .with_optional_recorder(recorder) .await }) .await @@ -416,17 +413,25 @@ impl QueryEngine { /// Loads the query schema. Only available when connected. #[napi] pub async fn sdl_schema(&self) -> napi::Result { + let dispatcher = self.logger.dispatcher(); + let recorder = self.logger.recorder(); + async_panic_to_js_error(async move { let inner = self.inner.read().await; let engine = inner.as_engine()?; Ok(render_graphql_schema(engine.query_schema())) }) + .with_subscriber(dispatcher) + .with_optional_recorder(recorder) .await } #[napi] pub async fn metrics(&self, json_options: String) -> napi::Result { + let dispatcher = self.logger.dispatcher(); + let recorder = self.logger.recorder(); + async_panic_to_js_error(async move { let inner = self.inner.read().await; let engine = inner.as_engine()?; @@ -447,6 +452,8 @@ impl QueryEngine { .into()) } }) + .with_subscriber(dispatcher) + .with_optional_recorder(recorder) .await } } diff --git a/query-engine/query-engine-node-api/src/logger.rs b/query-engine/query-engine-node-api/src/logger.rs index b86343bb4a94..2afd4a9fbd62 100644 --- a/query-engine/query-engine-node-api/src/logger.rs +++ b/query-engine/query-engine-node-api/src/logger.rs @@ -2,7 +2,7 @@ use core::fmt; use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode}; use query_core::telemetry; use query_engine_common::logger::StringCallback; -use query_engine_metrics::MetricRegistry; +use query_engine_metrics::{MetricRecorder, MetricRegistry}; use serde_json::Value; use std::{collections::BTreeMap, fmt::Display}; use tracing::{ @@ -21,6 +21,7 @@ pub(crate) type LogCallback = ThreadsafeFunction; pub(crate) struct Logger { dispatcher: Dispatch, metrics: Option, + recorder: Option, } impl Logger { @@ -63,16 +64,18 @@ impl Logger { let layer = log_callback.with_filter(filters); - let metrics = if enable_metrics { - query_engine_metrics::setup(); - Some(MetricRegistry::new()) + let (metrics, recorder) = if enable_metrics { + let registry = MetricRegistry::new(); + let recorder = MetricRecorder::new(registry.clone()).with_initialized_prisma_metrics(); + (Some(registry), Some(recorder)) } else { - None + (None, None) }; Self { - dispatcher: Dispatch::new(Registry::default().with(telemetry).with(layer).with(metrics.clone())), + dispatcher: Dispatch::new(Registry::default().with(telemetry).with(layer)), metrics, + recorder, } } @@ -83,6 +86,10 @@ impl Logger { pub fn metrics(&self) -> Option { self.metrics.clone() } + + pub fn recorder(&self) -> Option { + self.recorder.clone() + } } pub struct JsonVisitor<'a> { diff --git a/query-engine/query-engine-wasm/Cargo.toml b/query-engine/query-engine-wasm/Cargo.toml index 089dfba63bfd..f2e16224efb4 100644 --- a/query-engine/query-engine-wasm/Cargo.toml +++ b/query-engine/query-engine-wasm/Cargo.toml @@ -58,7 +58,7 @@ thiserror = "1" url.workspace = true serde.workspace = true tokio = { version = "1", features = ["macros", "sync", "io-util", "time"] } -futures = "0.3" +futures.workspace = true tracing.workspace = true tracing-subscriber = { version = "0.3" } diff --git a/query-engine/query-engine/src/context.rs b/query-engine/query-engine/src/context.rs index 7a1138c411e5..7ec58991c58b 100644 --- a/query-engine/query-engine/src/context.rs +++ b/query-engine/query-engine/src/context.rs @@ -8,8 +8,7 @@ use query_core::{ schema::{self, QuerySchemaRef}, QueryExecutor, }; -use query_engine_metrics::setup as metric_setup; -use query_engine_metrics::MetricRegistry; +use query_engine_metrics::{MetricRecorder, MetricRegistry}; use request_handlers::{load_executor, ConnectorKind}; use std::{env, fmt, sync::Arc}; use tracing::Instrument; @@ -103,22 +102,20 @@ impl PrismaContext { } } -pub async fn setup( - opts: &PrismaOpt, - install_logger: bool, - metrics: Option, -) -> PrismaResult> { - let metrics = metrics.unwrap_or_default(); - +pub async fn setup(opts: &PrismaOpt, install_logger: bool) -> PrismaResult> { if install_logger { - Logger::new("prisma-engine-http", Some(metrics.clone()), opts) - .install() - .unwrap(); + Logger::new("prisma-engine-http", opts).install().unwrap(); } - if opts.enable_metrics || opts.dataproxy_metric_override { - metric_setup(); - } + let metrics = if opts.enable_metrics || opts.dataproxy_metric_override { + let metrics = MetricRegistry::new(); + let recorder = MetricRecorder::new(metrics.clone()); + recorder.install_globally().expect("setup must be called only once"); + recorder.init_prisma_metrics(); + Some(metrics) + } else { + None + }; let datamodel = opts.schema(false)?; let config = &datamodel.configuration; @@ -133,7 +130,7 @@ pub async fn setup( features |= Feature::Metrics } - let cx = PrismaContext::new(datamodel, protocol, features, Some(metrics)) + let cx = PrismaContext::new(datamodel, protocol, features, metrics) .instrument(span) .await?; diff --git a/query-engine/query-engine/src/logger.rs b/query-engine/query-engine/src/logger.rs index 10f6ced58b86..b33a07d8f766 100644 --- a/query-engine/query-engine/src/logger.rs +++ b/query-engine/query-engine/src/logger.rs @@ -4,7 +4,6 @@ use opentelemetry::{ }; use opentelemetry_otlp::WithExportConfig; use query_core::telemetry; -use query_engine_metrics::MetricRegistry; use tracing::{dispatcher::SetGlobalDefaultError, subscriber}; use tracing_subscriber::{filter::filter_fn, layer::SubscriberExt, Layer}; @@ -19,7 +18,6 @@ pub(crate) struct Logger { log_format: LogFormat, log_queries: bool, tracing_config: TracingConfig, - metrics: Option, } // TracingConfig specifies how tracing will be exposed by the logger facility @@ -38,7 +36,7 @@ enum TracingConfig { impl Logger { /// Initialize a new global logger installer. - pub fn new(service_name: &'static str, metrics: Option, opts: &PrismaOpt) -> Self { + pub fn new(service_name: &'static str, opts: &PrismaOpt) -> Self { let enable_telemetry = opts.enable_open_telemetry; let enable_capturing = opts.enable_telemetry_in_response; let endpoint = if opts.open_telemetry_endpoint.is_empty() { @@ -58,7 +56,6 @@ impl Logger { service_name, log_format: opts.log_format(), log_queries: opts.log_queries(), - metrics, tracing_config, } } @@ -81,9 +78,7 @@ impl Logger { } }; - let subscriber = tracing_subscriber::registry() - .with(fmt_layer) - .with(self.metrics.clone()); + let subscriber = tracing_subscriber::registry().with(fmt_layer); match self.tracing_config { TracingConfig::Captured => { diff --git a/query-engine/query-engine/src/main.rs b/query-engine/query-engine/src/main.rs index 7c3a6f7a1db5..34a191fca80d 100644 --- a/query-engine/query-engine/src/main.rs +++ b/query-engine/query-engine/src/main.rs @@ -30,7 +30,7 @@ async fn main() -> Result<(), AnyError> { Some(cmd) => cmd.execute().await?, None => { let span = tracing::info_span!("prisma:engine:connect"); - let cx = context::setup(&opts, true, None).instrument(span).await?; + let cx = context::setup(&opts, true).instrument(span).await?; set_panic_hook(opts.log_format()); server::listen(cx, &opts).await?; } diff --git a/query-engine/request-handlers/Cargo.toml b/query-engine/request-handlers/Cargo.toml index e23d5927c555..3cc6690be210 100644 --- a/query-engine/request-handlers/Cargo.toml +++ b/query-engine/request-handlers/Cargo.toml @@ -14,7 +14,7 @@ itertools.workspace = true graphql-parser = { git = "https://github.com/prisma/graphql-parser", optional = true } serde.workspace = true serde_json.workspace = true -futures = "0.3" +futures.workspace = true indexmap.workspace = true bigdecimal = "0.3" thiserror = "1" diff --git a/schema-engine/connectors/mongodb-schema-connector/Cargo.toml b/schema-engine/connectors/mongodb-schema-connector/Cargo.toml index 90e8b02f447e..c5c8188b4399 100644 --- a/schema-engine/connectors/mongodb-schema-connector/Cargo.toml +++ b/schema-engine/connectors/mongodb-schema-connector/Cargo.toml @@ -14,7 +14,7 @@ user-facing-errors = { path = "../../../libs/user-facing-errors", features = [ ] } enumflags2.workspace = true -futures = "0.3" +futures.workspace = true mongodb.workspace = true bson.workspace = true serde_json.workspace = true @@ -33,4 +33,4 @@ url.workspace = true expect-test = "1" names = { version = "0.12", default-features = false } itertools.workspace = true -indoc.workspace = true \ No newline at end of file +indoc.workspace = true diff --git a/schema-engine/mongodb-schema-describer/Cargo.toml b/schema-engine/mongodb-schema-describer/Cargo.toml index d04c5ab9fb3d..e8885e990827 100644 --- a/schema-engine/mongodb-schema-describer/Cargo.toml +++ b/schema-engine/mongodb-schema-describer/Cargo.toml @@ -8,5 +8,5 @@ edition = "2021" [dependencies] mongodb.workspace = true bson.workspace = true -futures = "0.3" +futures.workspace = true serde.workspace = true