Skip to content

Commit

Permalink
fix: decouple metrics and tracing (#5023)
Browse files Browse the repository at this point in the history
  • Loading branch information
aqrln authored Oct 24, 2024
1 parent a87a639 commit 4b67dc7
Show file tree
Hide file tree
Showing 35 changed files with 411 additions and 350 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ members = [
[workspace.dependencies]
async-trait = { version = "0.1.77" }
enumflags2 = { version = "0.7", features = ["serde"] }
futures = "0.3"
psl = { path = "./psl/psl" }
serde_json = { version = "1", features = ["float_roundtrip", "preserve_order", "raw_value"] }
serde = { version = "1", features = ["derive"] }
Expand Down Expand Up @@ -66,6 +67,7 @@ napi = { version = "2.15.1", default-features = false, features = [
napi-derive = "2.15.0"
metrics = "0.23.0"
js-sys = { version = "0.3" }
pin-project = "1"
rand = { version = "0.8" }
regex = { version = "1", features = ["std"] }
serde_repr = { version = "0.1.17" }
Expand Down
4 changes: 2 additions & 2 deletions libs/crosstarget-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ edition = "2021"
[dependencies]
derive_more.workspace = true
enumflags2.workspace = true
futures = "0.3"
futures.workspace = true

[target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys.workspace = true
wasm-bindgen.workspace = true
wasm-bindgen-futures.workspace = true
tokio = { version = "1", features = ["macros", "sync"] }
pin-project = "1"
pin-project.workspace = true

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion quaint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async-trait.workspace = true
thiserror = "1.0"
num_cpus = "1.12"
metrics.workspace = true
futures = "0.3"
futures.workspace = true
url.workspace = true
hex = "0.4"
itertools.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ user-facing-errors.workspace = true
prisma-value = { path = "../../../libs/prisma-value" }
query-engine-metrics = { path = "../../metrics"}
once_cell = "1.15.0"
futures = "0.3"
futures.workspace = true
paste = "1.0.14"

[dev-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! actors to allow test to continue even if one query is blocking.

use indoc::indoc;
use query_engine_metrics::{MetricRecorder, WithMetricsInstrumentation};
use query_engine_tests::{
query_core::TxId, render_test_datamodel, setup_metrics, test_tracing_subscriber, LogEmit, QueryResult, Runner,
TestError, TestLogCapture, TestResult, WithSubscriber, CONFIG, ENV_LOG_LEVEL,
Expand Down Expand Up @@ -50,13 +51,12 @@ impl Actor {
/// Spawns a new query engine to the runtime.
pub async fn spawn() -> TestResult<Self> {
let (log_capture, log_tx) = TestLogCapture::new();
async fn with_logs<T>(fut: impl Future<Output = T>, log_tx: LogEmit) -> T {
fut.with_subscriber(test_tracing_subscriber(
ENV_LOG_LEVEL.to_string(),
setup_metrics(),
log_tx,
))
.await
let (metrics, recorder) = setup_metrics();

async fn with_observability<T>(fut: impl Future<Output = T>, log_tx: LogEmit, recorder: MetricRecorder) -> T {
fut.with_subscriber(test_tracing_subscriber(ENV_LOG_LEVEL.to_string(), log_tx))
.with_recorder(recorder)
.await
}

let (query_sender, mut query_receiver) = mpsc::channel(100);
Expand All @@ -73,21 +73,24 @@ impl Actor {
Some("READ COMMITTED"),
);

let mut runner = Runner::load(datamodel, &[], version, tag, None, setup_metrics(), log_capture).await?;
let mut runner = Runner::load(datamodel, &[], version, tag, None, metrics, log_capture).await?;

tokio::spawn(async move {
while let Some(message) = query_receiver.recv().await {
match message {
Message::Query(query) => {
let result = with_logs(runner.query(query), log_tx.clone()).await;
let result = with_observability(runner.query(query), log_tx.clone(), recorder.clone()).await;
response_sender.send(Response::Query(result)).await.unwrap();
}
Message::BeginTransaction => {
let response = with_logs(runner.start_tx(10000, 10000, None), log_tx.clone()).await;
let response =
with_observability(runner.start_tx(10000, 10000, None), log_tx.clone(), recorder.clone())
.await;
response_sender.send(Response::Tx(response)).await.unwrap();
}
Message::RollbackTransaction(tx_id) => {
let response = with_logs(runner.rollback_tx(tx_id), log_tx.clone()).await?;
let response =
with_observability(runner.rollback_tx(tx_id), log_tx.clone(), recorder.clone()).await?;
response_sender.send(Response::Rollback(response)).await.unwrap();
}
Message::SetActiveTx(tx_id) => {
Expand Down
29 changes: 9 additions & 20 deletions query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ pub use templating::*;
use colored::Colorize;
use once_cell::sync::Lazy;
use psl::datamodel_connector::ConnectorCapabilities;
use query_engine_metrics::MetricRegistry;
use query_engine_metrics::{MetricRecorder, MetricRegistry, WithMetricsInstrumentation};
use std::future::Future;
use std::sync::Once;
use tokio::runtime::Builder;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tracing_futures::WithSubscriber;
Expand Down Expand Up @@ -61,14 +60,10 @@ fn run_with_tokio<O, F: std::future::Future<Output = O>>(fut: F) -> O {
.block_on(fut)
}

static METRIC_RECORDER: Once = Once::new();

pub fn setup_metrics() -> MetricRegistry {
pub fn setup_metrics() -> (MetricRegistry, MetricRecorder) {
let metrics = MetricRegistry::new();
METRIC_RECORDER.call_once(|| {
query_engine_metrics::setup();
});
metrics
let recorder = MetricRecorder::new(metrics.clone()).with_initialized_prisma_metrics();
(metrics, recorder)
}

/// Taken from Reddit. Enables taking an async function pointer which takes references as param
Expand Down Expand Up @@ -161,8 +156,7 @@ fn run_relation_link_test_impl(

let datamodel = render_test_datamodel(&test_db_name, template, &[], None, Default::default(), Default::default(), None);
let (connector_tag, version) = CONFIG.test_connector().unwrap();
let metrics = setup_metrics();
let metrics_for_subscriber = metrics.clone();
let (metrics, recorder) = setup_metrics();
let (log_capture, log_tx) = TestLogCapture::new();

run_with_tokio(
Expand All @@ -176,9 +170,8 @@ fn run_relation_link_test_impl(

test_fn(&runner, &dm).with_subscriber(test_tracing_subscriber(
ENV_LOG_LEVEL.to_string(),
metrics_for_subscriber,
log_tx,
))
)).with_recorder(recorder)
.await.unwrap();

teardown_project(&datamodel, Default::default(), runner.schema_id())
Expand Down Expand Up @@ -275,8 +268,7 @@ fn run_connector_test_impl(
None,
);
let (connector_tag, version) = CONFIG.test_connector().unwrap();
let metrics = crate::setup_metrics();
let metrics_for_subscriber = metrics.clone();
let (metrics, recorder) = crate::setup_metrics();

let (log_capture, log_tx) = TestLogCapture::new();

Expand All @@ -297,11 +289,8 @@ fn run_connector_test_impl(
let schema_id = runner.schema_id();

if let Err(err) = test_fn(runner)
.with_subscriber(test_tracing_subscriber(
ENV_LOG_LEVEL.to_string(),
metrics_for_subscriber,
log_tx,
))
.with_subscriber(test_tracing_subscriber(ENV_LOG_LEVEL.to_string(), log_tx))
.with_recorder(recorder)
.await
{
panic!("💥 Test failed due to an error: {err:?}");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use query_core::telemetry::helpers as telemetry_helpers;
use query_engine_metrics::MetricRegistry;
use tracing::Subscriber;
use tracing_error::ErrorLayer;
use tracing_subscriber::{prelude::*, Layer};

use crate::LogEmit;

pub fn test_tracing_subscriber(log_config: String, metrics: MetricRegistry, log_tx: LogEmit) -> impl Subscriber {
pub fn test_tracing_subscriber(log_config: String, log_tx: LogEmit) -> impl Subscriber {
let filter = telemetry_helpers::env_filter(true, telemetry_helpers::QueryEngineLogLevel::Override(log_config));

let fmt_layer = tracing_subscriber::fmt::layer()
Expand All @@ -15,7 +14,6 @@ pub fn test_tracing_subscriber(log_config: String, metrics: MetricRegistry, log_

tracing_subscriber::registry()
.with(fmt_layer.boxed())
.with(metrics.boxed())
.with(ErrorLayer::default())
}

Expand Down
2 changes: 1 addition & 1 deletion query-engine/connectors/mongodb-query-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ version = "0.1.0"
anyhow = "1.0"
async-trait.workspace = true
bigdecimal = "0.3"
futures = "0.3"
futures.workspace = true
itertools.workspace = true
mongodb.workspace = true
bson.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion query-engine/connectors/query-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ version = "0.1.0"
anyhow = "1.0"
async-trait.workspace = true
chrono.workspace = true
futures = "0.3"
futures.workspace = true
itertools.workspace = true
query-structure = {path = "../../query-structure"}
prisma-value = {path = "../../../libs/prisma-value"}
Expand Down
2 changes: 1 addition & 1 deletion query-engine/connectors/sql-query-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ psl.workspace = true
anyhow = "1.0"
async-trait.workspace = true
bigdecimal = "0.3"
futures = "0.3"
futures.workspace = true
itertools.workspace = true
once_cell = "1.3"
rand.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion query-engine/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ connection-string.workspace = true
connector = { path = "../connectors/query-connector", package = "query-connector" }
crossbeam-channel = "0.5.6"
psl.workspace = true
futures = "0.3"
futures.workspace = true
indexmap.workspace = true
itertools.workspace = true
once_cell = "1"
Expand Down
9 changes: 6 additions & 3 deletions query-engine/core/src/executor/execute_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ use connector::{Connection, ConnectionLike, Connector};
use crosstarget_utils::time::ElapsedTimeCounter;
use futures::future;

#[cfg(not(feature = "metrics"))]
use crate::metrics::MetricsInstrumentationStub;
#[cfg(feature = "metrics")]
use query_engine_metrics::{
counter, histogram, PRISMA_CLIENT_QUERIES_DURATION_HISTOGRAM_MS, PRISMA_CLIENT_QUERIES_TOTAL,
counter, histogram, WithMetricsInstrumentation, PRISMA_CLIENT_QUERIES_DURATION_HISTOGRAM_MS,
PRISMA_CLIENT_QUERIES_TOTAL,
};

use schema::{QuerySchema, QuerySchemaRef};
Expand Down Expand Up @@ -106,7 +109,6 @@ pub async fn execute_many_self_contained<C: Connector + Send + Sync>(
) -> crate::Result<Vec<crate::Result<ResponseData>>> {
let mut futures = Vec::with_capacity(operations.len());

let dispatcher = crate::get_current_dispatcher();
for op in operations {
#[cfg(feature = "metrics")]
counter!(PRISMA_CLIENT_QUERIES_TOTAL).increment(1);
Expand All @@ -130,7 +132,8 @@ pub async fn execute_many_self_contained<C: Connector + Send + Sync>(
trace_id.clone(),
),
)
.with_subscriber(dispatcher.clone()),
.with_current_subscriber()
.with_current_recorder(),
));
}

Expand Down
18 changes: 0 additions & 18 deletions query-engine/core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::{
use async_trait::async_trait;
use connector::Connector;
use serde::{Deserialize, Serialize};
use tracing::Dispatch;

#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
Expand Down Expand Up @@ -116,20 +115,3 @@ pub trait TransactionManager {
/// Rolls back a transaction.
async fn rollback_tx(&self, tx_id: TxId) -> crate::Result<()>;
}

// With the node-api when a future is spawned in a new thread `tokio:spawn` it will not
// use the current dispatcher and its logs will not be captured anymore. We can use this
// method to get the current dispatcher and combine it with `with_subscriber`
// let dispatcher = get_current_dispatcher();
// tokio::spawn(async {
// my_async_ops.await
// }.with_subscriber(dispatcher));
//
//
// Finally, this can be replaced with with_current_collector
// https://github.com/tokio-rs/tracing/blob/master/tracing-futures/src/lib.rs#L234
// once this is in a release

pub fn get_current_dispatcher() -> Dispatch {
tracing::dispatcher::get_default(|current| current.clone())
}
9 changes: 7 additions & 2 deletions query-engine/core/src/interactive_transactions/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ use tracing::Span;
use tracing_futures::Instrument;
use tracing_futures::WithSubscriber;

#[cfg(not(feature = "metrics"))]
use crate::metrics::MetricsInstrumentationStub;
#[cfg(feature = "metrics")]
use query_engine_metrics::WithMetricsInstrumentation;

#[cfg(feature = "metrics")]
use crate::telemetry::helpers::set_span_link_from_traceparent;

Expand Down Expand Up @@ -264,7 +269,6 @@ pub(crate) async fn spawn_itx_actor(
let span = Span::current();
let tx_id_str = tx_id.to_string();
span.record("itx_id", tx_id_str.as_str());
let dispatcher = crate::get_current_dispatcher();

let (tx_to_server, rx_from_client) = channel::<TxOpRequest>(channel_size);
let client = ITXClient {
Expand Down Expand Up @@ -334,7 +338,8 @@ pub(crate) async fn spawn_itx_actor(
trace!("[{}] has stopped with {}", server.id.to_string(), server.cached_tx);
})
.instrument(span)
.with_subscriber(dispatcher),
.with_current_subscriber()
.with_current_recorder(),
);

open_transaction_rcv.await.unwrap()?;
Expand Down
1 change: 1 addition & 0 deletions query-engine/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub use connector::{
mod error;
mod interactive_transactions;
mod interpreter;
mod metrics;
mod query_ast;
mod query_graph;
mod result_ast;
Expand Down
13 changes: 13 additions & 0 deletions query-engine/core/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/// When the `metrics` feature is disabled, we don't compile the `query-engine-metrics` crate and
/// thus can't use the metrics instrumentation. To avoid the boilerplate of putting every
/// `with_current_recorder` call behind `#[cfg]`, we use this stub trait that does nothing but
/// allows the code that relies on `WithMetricsInstrumentation` trait to be in scope compile.
#[cfg(not(feature = "metrics"))]
pub(crate) trait MetricsInstrumentationStub: Sized {
fn with_current_recorder(self) -> Self {
self
}
}

#[cfg(not(feature = "metrics"))]
impl<T> MetricsInstrumentationStub for T {}
10 changes: 5 additions & 5 deletions query-engine/core/src/telemetry/capturing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ use once_cell::sync::Lazy;
use opentelemetry::{global, sdk, trace};
use tracing::subscriber;
use tracing_subscriber::{
filter::filter_fn, layer::Layered, prelude::__tracing_subscriber_SubscriberExt, Layer, Registry,
filter::filter_fn,
layer::{Layered, SubscriberExt},
registry::LookupSpan,
Layer, Registry,
};

static PROCESSOR: Lazy<capturer::Processor> = Lazy::new(Processor::default);
Expand All @@ -161,10 +164,7 @@ pub fn capturer(trace_id: trace::TraceId, settings: Settings) -> Capturer {
#[cfg(feature = "metrics")]
#[allow(clippy::type_complexity)]
pub fn install_capturing_layer(
subscriber: Layered<
Option<query_engine_metrics::MetricRegistry>,
Layered<Box<dyn Layer<Registry> + Send + Sync>, Registry>,
>,
subscriber: impl SubscriberExt + for<'a> LookupSpan<'a> + Send + Sync + 'static,
log_queries: bool,
) {
// set a trace context propagator, so that the trace context is propagated via the
Expand Down
Loading

0 comments on commit 4b67dc7

Please sign in to comment.