From d1aa653d76163a102948d4b353fc5c69a3ba7c1c Mon Sep 17 00:00:00 2001 From: Sergii Mikhtoniuk Date: Thu, 12 Sep 2024 18:52:44 -0700 Subject: [PATCH] Integrate prometheus metrics --- CHANGELOG.md | 3 + Cargo.lock | 164 +++++++++++++++- Cargo.toml | 2 + deny.toml | 34 +--- resources/cli-reference.md | 1 + src/app/cli/Cargo.toml | 4 +- src/app/cli/src/app.rs | 27 +++ src/app/cli/src/cli.rs | 4 + src/app/cli/src/explore/api_server.rs | 31 +-- src/app/cli/src/explore/web_ui_server.rs | 50 +++-- src/app/cli/src/output/output_config.rs | 4 + src/domain/task-system/services/Cargo.toml | 1 + .../services/src/task_executor_impl.rs | 7 +- .../transactions/db_transaction_manager.rs | 11 +- src/utils/messaging-outbox/Cargo.toml | 4 +- .../outbox_transactional_processor.rs | 131 ++++++++++++- .../test_outbox_transactional_processor.rs | 47 +++++ src/utils/observability/Cargo.toml | 70 +++++++ src/utils/observability/src/axum.rs | 151 +++++++++++++++ src/utils/observability/src/config.rs | 84 ++++++++ src/utils/observability/src/health.rs | 87 +++++++++ src/utils/observability/src/init.rs | 182 ++++++++++++++++++ src/utils/observability/src/lib.rs | 17 ++ src/utils/observability/src/metrics.rs | 56 ++++++ src/utils/observability/src/tracing.rs | 63 ++++++ 25 files changed, 1158 insertions(+), 77 deletions(-) create mode 100644 src/utils/observability/Cargo.toml create mode 100644 src/utils/observability/src/axum.rs create mode 100644 src/utils/observability/src/config.rs create mode 100644 src/utils/observability/src/health.rs create mode 100644 src/utils/observability/src/init.rs create mode 100644 src/utils/observability/src/lib.rs create mode 100644 src/utils/observability/src/metrics.rs create mode 100644 src/utils/observability/src/tracing.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 88963aaa3f..a1c858a762 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- Added first integration of Prometheus metrics starting with Outbox +- Added `--metrics` CLI flag that will dump metrics into a file after command execution ### Changed - Telemetry improvements: - Improved data collected around transactional code diff --git a/Cargo.lock b/Cargo.lock index 75fed5cbf5..65a82a5b9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5426,9 +5426,11 @@ dependencies = [ "mime_guess", "minus", "num-format", + "observability", "opendatafabric", "pretty_assertions", "prettytable-rs", + "prometheus", "random-names", "read_input", "regex", @@ -6159,6 +6161,7 @@ dependencies = [ "kamu-task-system-inmem", "messaging-outbox", "mockall", + "observability", "opendatafabric", "serde_json", "test-log", @@ -6570,7 +6573,9 @@ dependencies = [ "internal-error", "kamu-messaging-outbox-inmem", "mockall", + "observability", "paste", + "prometheus", "serde", "serde_json", "test-log", @@ -7001,6 +7006,30 @@ dependencies = [ "walkdir", ] +[[package]] +name = "observability" +version = "0.199.3" +dependencies = [ + "async-trait", + "axum", + "dill", + "http 0.2.12", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "prometheus", + "serde", + "serde_json", + "thiserror", + "tower", + "tower-http", + "tracing", + "tracing-appender", + "tracing-opentelemetry", + "tracing-subscriber", +] + [[package]] name = "once_cell" version = "1.19.0" @@ -7080,6 +7109,77 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b69a91d4893e713e06f724597ad630f1fa76057a5e1026c0ca67054a9032a76" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a94c69209c05319cdf7460c6d4c055ed102be242a0a6245835d7bc42c6ec7f54" +dependencies = [ + "async-trait", + "futures-core", + "http 0.2.12", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "thiserror", + "tokio", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "984806e6cf27f2b49282e2a05e288f30594f3dbc74eb7a6e99422bc48ed78162" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cefe0543875379e47eb5f1e68ff83f45cc41366a92dfd0d073d513bf68e9a05" + +[[package]] +name = "opentelemetry_sdk" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae312d58eaa90a82d2e627fd86e075cf5230b3f11794e2ed74199ebbe572d4fd" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "lazy_static", + "once_cell", + "opentelemetry", + "ordered-float 4.2.2", + "percent-encoding", + "rand", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -7095,6 +7195,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "4.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a91171844676f8c7990ce64959210cd2eaef32c2612c50f9fae9f8aaa6065a6" +dependencies = [ + "num-traits", +] + [[package]] name = "oso" version = "0.27.3" @@ -7706,6 +7815,20 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "thiserror", +] + [[package]] name = "proptest" version = "1.5.0" @@ -9489,7 +9612,7 @@ checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" dependencies = [ "byteorder", "integer-encoding", - "ordered-float", + "ordered-float 2.10.1", ] [[package]] @@ -9912,6 +10035,22 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f68803492bf28ab40aeccaecc7021096bd256baf7ca77c3d425d89b35a7be4e4" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "tracing", + "tracing-core", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-perfetto" version = "0.199.3" @@ -9924,6 +10063,16 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" @@ -9934,12 +10083,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log 0.2.0", + "tracing-serde", ] [[package]] @@ -10370,6 +10522,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webbrowser" version = "0.8.15" diff --git a/Cargo.toml b/Cargo.toml index ab2d048e95..1a7abf412b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "src/utils/kamu-cli-puppet", "src/utils/messaging-outbox", "src/utils/multiformats", + "src/utils/observability", "src/utils/random-names", "src/utils/repo-tools", "src/utils/time-source", @@ -104,6 +105,7 @@ kamu-data-utils = { version = "0.199.3", path = "src/utils/data-utils", default- kamu-datafusion-cli = { version = "0.199.3", path = "src/utils/datafusion-cli", default-features = false } messaging-outbox = { version = "0.199.3", path = "src/utils/messaging-outbox", default-features = false } multiformats = { version = "0.199.3", path = "src/utils/multiformats", default-features = false } +observability = { version = "0.199.3", path = "src/utils/observability", default-features = false } random-names = { version = "0.199.3", path = "src/utils/random-names", default-features = false } time-source = { version = "0.199.3", path = "src/utils/time-source", default-features = false } tracing-perfetto = { version = "0.199.3", path = "src/utils/tracing-perfetto", default-features = false } diff --git a/deny.toml b/deny.toml index 22663e4f48..7453357075 100644 --- a/deny.toml +++ b/deny.toml @@ -11,34 +11,12 @@ skip-tree = [] wildcards = "deny" # We specify features explicitly to avoid bloat -workspace-default-features = "deny" -features = [ - { name = "opendatafabric", allow = [ - "default", - "sqlx", - "sqlx-mysql", - "sqlx-postgres", - "sqlx-sqlite", - ] }, - { name = "kamu", allow = [ - "default", - "ingest-evm", - "ingest-ftp", - "ingest-mqtt", - "query-extensions-json", - ] }, - { name = "kamu-accounts", allow = [ - "default", - "sqlx", - ] }, - { name = "kamu-datasets", allow = [ - "default", - "sqlx", - ] }, - { name = "kamu-cli", allow = [ - "default", - ] }, -] +# TODO: https://github.com/EmbarkStudios/cargo-deny/issues/699 +# external-default-features = "deny" + +# TODO: https://github.com/EmbarkStudios/cargo-deny/issues/700 +# workspace-default-features = "deny" + deny = [ ### Crates we shouldn't use #### diff --git a/resources/cli-reference.md b/resources/cli-reference.md index d4df2f9698..40b79e213f 100644 --- a/resources/cli-reference.md +++ b/resources/cli-reference.md @@ -43,6 +43,7 @@ To regenerate this schema from existing code, use the following command: * `--no-color` — Disable color output in the terminal * `-q`, `--quiet` — Suppress all non-essential output * `--trace` — Record and visualize the command execution as perfetto.dev trace +* `--metrics` — Dump all metrics at the end of command execution To get help for individual commands use: kamu -h diff --git a/src/app/cli/Cargo.toml b/src/app/cli/Cargo.toml index 6456653ddc..99b0c209b5 100644 --- a/src/app/cli/Cargo.toml +++ b/src/app/cli/Cargo.toml @@ -45,6 +45,7 @@ database-common = { workspace = true } database-common-macros = { workspace = true } http-common = { workspace = true } internal-error = { workspace = true } +observability = { workspace = true, features = ["prometheus"] } time-source = { workspace = true } kamu = { workspace = true } @@ -139,7 +140,8 @@ serde = { version = "1", features = ["derive"] } serde_with = "3" serde_yaml = "0.9" -# Tracing / logging / telemetry +# Tracing / logging / telemetry / metrics +prometheus = { version = "0.13", default-features = false } tracing = "0.1" tracing-appender = "0.2" tracing-perfetto = { workspace = true } diff --git a/src/app/cli/src/app.rs b/src/app/cli/src/app.rs index 872ba874e1..9352a50e5b 100644 --- a/src/app/cli/src/app.rs +++ b/src/app/cli/src/app.rs @@ -177,6 +177,9 @@ pub async fn run(workspace_layout: WorkspaceLayout, args: cli::Cli) -> Result<() (guards, final_base_catalog, cli_catalog, output_config) }; + // Register metrics + let metrics_registry = observability::metrics::register_all(&cli_catalog); + // Evict cache if workspace_svc.is_in_workspace() && !workspace_svc.is_upgrade_needed()? { cli_catalog.get_one::()?.evict_cache()?; @@ -240,6 +243,14 @@ pub async fn run(workspace_layout: WorkspaceLayout, args: cli::Cli) -> Result<() // Flush all logging sinks drop(guards); + if let Some(metrics_file) = &output_config.metrics_file { + if let Ok(mut file) = std::fs::File::create(metrics_file) { + use prometheus::Encoder as _; + let _ = prometheus::TextEncoder::new().encode(&metrics_registry.gather(), &mut file); + eprintln!("Saving metrics to {}", metrics_file.display()); + } + } + if let Some(trace_file) = &output_config.trace_file { // Run a web server and open the trace in the browser if the environment allows let _ = TraceServer::maybe_serve_in_browser(trace_file).await; @@ -317,6 +328,8 @@ pub fn configure_base_catalog( b.add_value(CacheDir::new(&workspace_layout.cache_dir)); b.add_value(RemoteReposDir::new(&workspace_layout.repos_dir)); + b.add_value(prometheus::Registry::new()); + b.add::(); if let Some(system_time) = system_time { @@ -440,6 +453,7 @@ pub fn configure_base_catalog( b.add::(); b.bind::(); b.add::(); + b.add::(); register_message_dispatcher::( &mut b, @@ -839,6 +853,18 @@ fn configure_output_format(args: &cli::Cli, workspace_svc: &WorkspaceService) -> None }; + let metrics_file = if args.metrics && workspace_svc.is_in_workspace() { + Some( + workspace_svc + .layout() + .unwrap() + .run_info_dir + .join("kamu.metrics.txt"), + ) + } else { + None + }; + let format = args.tabular_output_format().unwrap_or(if is_tty { OutputFormat::Table } else { @@ -851,6 +877,7 @@ fn configure_output_format(args: &cli::Cli, workspace_svc: &WorkspaceService) -> is_tty, format, trace_file, + metrics_file, } } diff --git a/src/app/cli/src/cli.rs b/src/app/cli/src/cli.rs index 98a14c74da..1d789a2224 100644 --- a/src/app/cli/src/cli.rs +++ b/src/app/cli/src/cli.rs @@ -49,6 +49,10 @@ pub struct Cli { #[arg(long)] pub trace: bool, + /// Dump all metrics at the end of command execution + #[arg(long)] + pub metrics: bool, + /// Overrides system time clock with provided value #[arg(long, value_name = "T", hide = true)] pub system_time: Option, diff --git a/src/app/cli/src/explore/api_server.rs b/src/app/cli/src/explore/api_server.rs index 85fcea651d..fad0969b95 100644 --- a/src/app/cli/src/explore/api_server.rs +++ b/src/app/cli/src/explore/api_server.rs @@ -142,19 +142,26 @@ impl APIServer { multi_tenant_workspace, ), ) + .layer(kamu_adapter_http::AuthenticationLayer::new()) .layer( - tower::ServiceBuilder::new() - .layer(tower_http::trace::TraceLayer::new_for_http()) - .layer( - tower_http::cors::CorsLayer::new() - .allow_origin(tower_http::cors::Any) - .allow_methods(vec![http::Method::GET, http::Method::POST]) - .allow_headers(tower_http::cors::Any), - ) - .layer(Extension(api_server_catalog)) - .layer(Extension(gql_schema)) - .layer(kamu_adapter_http::AuthenticationLayer::new()), - ); + tower_http::cors::CorsLayer::new() + .allow_origin(tower_http::cors::Any) + .allow_methods(vec![http::Method::GET, http::Method::POST]) + .allow_headers(tower_http::cors::Any), + ) + .layer(observability::axum::http_layer()) + // Note: Healthcheck and metrics routes are placed before the tracing layer (layers + // execute bottom-up) to avoid spam in logs + .route( + "/system/health", + axum::routing::get(observability::health::health_handler), + ) + .route( + "/system/metrics", + axum::routing::get(observability::metrics::metrics_handler), + ) + .layer(axum::extract::Extension(gql_schema)) + .layer(axum::extract::Extension(api_server_catalog)); let is_e2e_testing = e2e_output_data_path.is_some(); let maybe_shutdown_notify = if is_e2e_testing { diff --git a/src/app/cli/src/explore/web_ui_server.rs b/src/app/cli/src/explore/web_ui_server.rs index 832f94fe3d..7b13189ed1 100644 --- a/src/app/cli/src/explore/web_ui_server.rs +++ b/src/app/cli/src/explore/web_ui_server.rs @@ -12,8 +12,11 @@ use std::sync::Arc; use axum::http::Uri; use axum::response::{IntoResponse, Response}; +use axum::Extension; use database_common::DatabaseTransactionRunner; +use database_common_macros::transactional_handler; use dill::{Catalog, CatalogBuilder}; +use http_common::ApiError; use internal_error::InternalError; use kamu::domain::{Protocols, ServerUrlConfig}; use kamu_accounts::{ @@ -184,21 +187,27 @@ impl WebUIServer { ), ) .fallback(app_handler) + .layer(kamu_adapter_http::AuthenticationLayer::new()) .layer( - tower::ServiceBuilder::new() - .layer(tower_http::trace::TraceLayer::new_for_http()) - .layer( - tower_http::cors::CorsLayer::new() - .allow_origin(tower_http::cors::Any) - .allow_methods(vec![http::Method::GET, http::Method::POST]) - .allow_headers(tower_http::cors::Any), - ) - .layer(axum::extract::Extension(web_ui_catalog)) - .layer(axum::extract::Extension(gql_schema)) - .layer(axum::extract::Extension(web_ui_config)) - .layer(kamu_adapter_http::RunInDatabaseTransactionLayer::new()) - .layer(kamu_adapter_http::AuthenticationLayer::new()), - ); + tower_http::cors::CorsLayer::new() + .allow_origin(tower_http::cors::Any) + .allow_methods(vec![http::Method::GET, http::Method::POST]) + .allow_headers(tower_http::cors::Any), + ) + .layer(observability::axum::http_layer()) + // Note: Healthcheck and metrics routes are placed before the tracing layer (layers + // execute bottom-up) to avoid spam in logs + .route( + "/system/health", + axum::routing::get(observability::health::health_handler), + ) + .route( + "/system/metrics", + axum::routing::get(observability::metrics::metrics_handler), + ) + .layer(axum::extract::Extension(web_ui_catalog)) + .layer(axum::extract::Extension(gql_schema)) + .layer(axum::extract::Extension(web_ui_config)); let server = axum::Server::builder(bound_addr).serve(app.into_make_service()); @@ -270,13 +279,16 @@ async fn runtime_config_handler( //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +#[transactional_handler] async fn graphql_handler( - schema: axum::extract::Extension, - catalog: axum::extract::Extension, + Extension(schema): Extension, + Extension(catalog): Extension, req: async_graphql_axum::GraphQLRequest, -) -> async_graphql_axum::GraphQLResponse { - let graphql_request = req.into_inner().data(catalog.0); - schema.execute(graphql_request).await.into() +) -> Result { + let graphql_request = req.into_inner().data(catalog); + let graphql_response = schema.execute(graphql_request).await.into(); + + Ok(graphql_response) } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/app/cli/src/output/output_config.rs b/src/app/cli/src/output/output_config.rs index 6a30defc1c..d5fab46d84 100644 --- a/src/app/cli/src/output/output_config.rs +++ b/src/app/cli/src/output/output_config.rs @@ -24,6 +24,9 @@ pub struct OutputConfig { pub format: OutputFormat, /// Points to the output trace file if Perfetto tracing was enabled pub trace_file: Option, + /// Points to the output metrics file if Prometheus metrics dump was + /// requested + pub metrics_file: Option, } impl Default for OutputConfig { @@ -34,6 +37,7 @@ impl Default for OutputConfig { is_tty: false, format: OutputFormat::Table, trace_file: None, + metrics_file: None, } } } diff --git a/src/domain/task-system/services/Cargo.toml b/src/domain/task-system/services/Cargo.toml index e77a702b48..87e8a2dc4a 100644 --- a/src/domain/task-system/services/Cargo.toml +++ b/src/domain/task-system/services/Cargo.toml @@ -26,6 +26,7 @@ database-common = { workspace = true } database-common-macros = { workspace = true } internal-error = { workspace = true } messaging-outbox = { workspace = true } +observability = { workspace = true } opendatafabric = { workspace = true } kamu-core = { workspace = true } kamu-datasets = { workspace = true } diff --git a/src/domain/task-system/services/src/task_executor_impl.rs b/src/domain/task-system/services/src/task_executor_impl.rs index 46431a91aa..bc95f6cdf7 100644 --- a/src/domain/task-system/services/src/task_executor_impl.rs +++ b/src/domain/task-system/services/src/task_executor_impl.rs @@ -126,8 +126,13 @@ impl TaskExecutorImpl { Ok(Some(task)) } - #[tracing::instrument(level = "info", skip_all, fields(task_id = %task.task_id))] async fn run_task(&self, task: &Task) -> Result { + let span = observability::tracing::root_span!( + "run_task", + task_id = %task.task_id, + ); + let _ = span.enter(); + // Run task via logical plan let task_run_result = self .task_logical_plan_runner diff --git a/src/utils/database-common/src/transactions/db_transaction_manager.rs b/src/utils/database-common/src/transactions/db_transaction_manager.rs index d05484f711..63570cd3ff 100644 --- a/src/utils/database-common/src/transactions/db_transaction_manager.rs +++ b/src/utils/database-common/src/transactions/db_transaction_manager.rs @@ -44,7 +44,7 @@ impl DatabaseTransactionRunner { Self { catalog } } - #[tracing::instrument(level = "info", skip_all)] + #[tracing::instrument(level = "debug", skip_all)] pub async fn transactional( &self, callback: H, @@ -74,7 +74,7 @@ impl DatabaseTransactionRunner { .build(); callback(catalog_with_transaction) - .instrument(tracing::info_span!("Transaction Body")) + .instrument(tracing::debug_span!("Transaction Body")) .await }; @@ -82,9 +82,10 @@ impl DatabaseTransactionRunner { match result { // In case everything succeeded, commit the transaction Ok(res) => { + tracing::debug!("Transaction COMMIT"); + db_transaction_manager .commit_transaction(transaction_ref) - .instrument(tracing::info_span!("Transaction COMMIT")) .await?; Ok(res) @@ -92,10 +93,12 @@ impl DatabaseTransactionRunner { // Otherwise, do an explicit rollback Err(e) => { + tracing::warn!("Transaction ROLLBACK"); + db_transaction_manager .rollback_transaction(transaction_ref) - .instrument(tracing::error_span!("Transaction ROLLBACK")) .await?; + Err(e) } } diff --git a/src/utils/messaging-outbox/Cargo.toml b/src/utils/messaging-outbox/Cargo.toml index 32ea33430c..2b2a2f0aec 100644 --- a/src/utils/messaging-outbox/Cargo.toml +++ b/src/utils/messaging-outbox/Cargo.toml @@ -25,6 +25,7 @@ doctest = false database-common = { workspace = true } database-common-macros = { workspace = true } internal-error = { workspace = true } +observability = { workspace = true, features = ["prometheus"] } time-source = { workspace = true } async-trait = "0.1" @@ -32,6 +33,7 @@ chrono = { version = "0.4" } dill = "0.9" futures = "0.3" mockall = "0.11" +prometheus = { version = "0.13", default-features = false } serde = { version = "1", features = ["derive"] } serde_json = "1" thiserror = "1" @@ -45,4 +47,4 @@ kamu-messaging-outbox-inmem = { workspace = true } paste = "1" serde = { version = "1", features = ["derive"] } test-log = { version = "0.2", features = ["trace"] } -tokio = { version = "1", default-features = false, features = ["rt", "macros"] } \ No newline at end of file +tokio = { version = "1", default-features = false, features = ["rt", "macros"] } diff --git a/src/utils/messaging-outbox/src/services/outbox_transactional_processor.rs b/src/utils/messaging-outbox/src/services/outbox_transactional_processor.rs index 42535ba126..603c701d86 100644 --- a/src/utils/messaging-outbox/src/services/outbox_transactional_processor.rs +++ b/src/utils/messaging-outbox/src/services/outbox_transactional_processor.rs @@ -13,6 +13,7 @@ use std::sync::{Arc, Mutex}; use database_common_macros::{transactional_method, transactional_method1}; use dill::{component, scope, Catalog, Singleton}; use internal_error::{InternalError, ResultIntoInternal}; +use observability::metrics::MetricsProvider; use crate::*; @@ -23,6 +24,7 @@ pub struct OutboxTransactionalProcessor { config: Arc, routes_static_info: Arc, producer_relay_jobs: Vec, + metrics: Arc, } #[component(pub)] @@ -32,12 +34,15 @@ impl OutboxTransactionalProcessor { catalog: Catalog, config: Arc, message_dispatchers_by_producers: Vec>, + metrics: Arc, ) -> Self { let routes_static_info = Arc::new(Self::make_static_routes_info( &catalog, message_dispatchers_by_producers, )); + metrics.init(&routes_static_info.consumers_by_producers); + let mut producer_relay_jobs = Vec::new(); for (producer_name, consumer_names) in &routes_static_info.consumers_by_producers { producer_relay_jobs.push(ProducerRelayJob::new( @@ -46,6 +51,7 @@ impl OutboxTransactionalProcessor { routes_static_info.clone(), producer_name.clone(), consumer_names.clone(), + metrics.clone(), )); } @@ -54,6 +60,7 @@ impl OutboxTransactionalProcessor { config, routes_static_info, producer_relay_jobs, + metrics, } } @@ -141,8 +148,10 @@ impl OutboxTransactionalProcessor { Ok(()) } - #[tracing::instrument(level = "debug", skip_all)] async fn run_relay_iteration(&self) -> Result<(), InternalError> { + let span = observability::tracing::root_span!("outbox_iteration"); + let _ = span.enter(); + // producer A - message 17 // producer B - message 19 let latest_message_ids_by_producer = self.select_latest_message_ids_by_producers().await?; @@ -159,11 +168,6 @@ impl OutboxTransactionalProcessor { // Prepare iteration for each producer let mut producer_tasks = Vec::new(); for producer_relay_job in &self.producer_relay_jobs { - // Skip this relay if no more working consumers left - if producer_relay_job.all_consumers_failing() { - continue; - } - // Extract latest message ID by producer let Some(latest_produced_message_id) = latest_message_ids_by_producer.get(&producer_relay_job.producer_name) @@ -178,6 +182,21 @@ impl OutboxTransactionalProcessor { continue; }; + // Report queue length metrics + for (consumer, last_consumed_message_id) in &consumption_boundaries { + let queue_length = + latest_produced_message_id.into_inner() - last_consumed_message_id.into_inner(); + self.metrics + .messages_pending_total + .with_label_values(&[&producer_relay_job.producer_name, consumer]) + .set(queue_length); + } + + // Skip this relay if no more working consumers left + if producer_relay_job.all_consumers_failing() { + continue; + } + producer_tasks.push(( producer_relay_job, latest_produced_message_id, @@ -279,6 +298,7 @@ struct ProducerRelayJob { producer_name: String, consumer_names: Vec, failed_consumer_names: Mutex>, + metrics: Arc, } impl ProducerRelayJob { @@ -288,6 +308,7 @@ impl ProducerRelayJob { relay_routes_static_info: Arc, producer_name: String, consumer_names: Vec, + metrics: Arc, ) -> Self { Self { catalog, @@ -295,6 +316,7 @@ impl ProducerRelayJob { relay_routes_static_info, producer_name, consumer_names, + metrics, failed_consumer_names: Mutex::new(HashSet::new()), } } @@ -392,8 +414,15 @@ impl ProducerRelayJob { error_msg = %e.source, consumer_name = %e.consumer_name, outbox_message = ?message, - "Consuming outbox message failed - pausing further message processing for consumer until restart." + "Consuming outbox message failed \ + - pausing further message processing for consumer until restart." ); + + self.metrics + .failed_consumers_total + .with_label_values(&[&self.producer_name, &e.consumer_name]) + .set(1); + failing_consumer_names.insert(e.consumer_name); }); @@ -486,12 +515,18 @@ impl ProducerRelayJob { // Shift consumption record regardless of whether the consumer was interested in // the message tracing::debug!( - consumer_name=%consumer_name, - producer_name=%message.producer_name, - last_consumed_message_id=%message.message_id, + consumer_name = %consumer_name, + producer_name = %message.producer_name, + last_consumed_message_id = %message.message_id, "Shifting consumption record" ); + // Report consumption metric + self.metrics + .messages_processed_total + .with_label_values(&[&self.producer_name, consumer_name]) + .inc(); + let consumption_repository = transaction_catalog .get_one::() .unwrap(); @@ -509,3 +544,79 @@ impl ProducerRelayJob { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug)] + +pub struct OutboxTransactionalProcessorMetrics { + pub messages_processed_total: prometheus::IntCounterVec, + pub messages_pending_total: prometheus::IntGaugeVec, + pub failed_consumers_total: prometheus::IntGaugeVec, +} + +#[dill::component(pub)] +#[dill::interface(dyn MetricsProvider)] +#[dill::scope(dill::Singleton)] +impl OutboxTransactionalProcessorMetrics { + pub fn new() -> Self { + use prometheus::*; + + Self { + messages_processed_total: IntCounterVec::new( + Opts::new( + "outbox_messages_processed_total", + "Number of messages processed by an individual producer-consumer pair", + ), + &["producer", "consumer"], + ) + .unwrap(), + messages_pending_total: IntGaugeVec::new( + Opts::new( + "outbox_messages_pending_total", + "Number of messages that are awaiting processing in an individual \ + producer-consumer pair", + ), + &["producer", "consumer"], + ) + .unwrap(), + failed_consumers_total: IntGaugeVec::new( + Opts::new( + "outbox_failed_consumers_total", + "Number of consumers that are in the failed state and have stopped consuming \ + messages", + ), + &["producer", "consumer"], + ) + .unwrap(), + } + } + + /// Initializes labeled metrics so they show up in the output early + fn init(&self, producer_consumers: &HashMap>) { + for (producer, consumers) in producer_consumers { + for consumer in consumers { + self.messages_processed_total + .with_label_values(&[producer, consumer]) + .reset(); + + self.messages_pending_total + .with_label_values(&[producer, consumer]) + .set(0); + + self.failed_consumers_total + .with_label_values(&[producer, consumer]) + .set(0); + } + } + } +} + +impl MetricsProvider for OutboxTransactionalProcessorMetrics { + fn register(&self, reg: &prometheus::Registry) -> prometheus::Result<()> { + reg.register(Box::new(self.messages_processed_total.clone()))?; + reg.register(Box::new(self.messages_pending_total.clone()))?; + reg.register(Box::new(self.failed_consumers_total.clone()))?; + Ok(()) + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/utils/messaging-outbox/tests/tests/test_outbox_transactional_processor.rs b/src/utils/messaging-outbox/tests/tests/test_outbox_transactional_processor.rs index a016c50c93..ebd370f391 100644 --- a/src/utils/messaging-outbox/tests/tests/test_outbox_transactional_processor.rs +++ b/src/utils/messaging-outbox/tests/tests/test_outbox_transactional_processor.rs @@ -90,6 +90,13 @@ async fn test_deliver_messages_of_one_type() { (TEST_PRODUCER_C, "TestMessageConsumerC2", 0), ]) .await; + + harness.check_metrics_messages_processed_total(&[ + (TEST_PRODUCER_A, "TestMessageConsumerA", 2), + (TEST_PRODUCER_B, "TestMessageConsumerB", 0), + (TEST_PRODUCER_C, "TestMessageConsumerC1", 0), + (TEST_PRODUCER_C, "TestMessageConsumerC2", 0), + ]); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -146,6 +153,13 @@ async fn test_deliver_messages_of_two_types() { (TEST_PRODUCER_C, "TestMessageConsumerC2", 0), ]) .await; + + harness.check_metrics_messages_processed_total(&[ + (TEST_PRODUCER_A, "TestMessageConsumerA", 1), + (TEST_PRODUCER_B, "TestMessageConsumerB", 1), + (TEST_PRODUCER_C, "TestMessageConsumerC1", 0), + (TEST_PRODUCER_C, "TestMessageConsumerC2", 0), + ]); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -207,6 +221,13 @@ async fn test_deliver_messages_multiple_consumers() { (TEST_PRODUCER_C, "TestMessageConsumerC2", 2), ]) .await; + + harness.check_metrics_messages_processed_total(&[ + (TEST_PRODUCER_A, "TestMessageConsumerA", 0), + (TEST_PRODUCER_B, "TestMessageConsumerB", 0), + (TEST_PRODUCER_C, "TestMessageConsumerC1", 2), + (TEST_PRODUCER_C, "TestMessageConsumerC2", 2), + ]); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -292,6 +313,13 @@ async fn test_deliver_messages_with_partial_consumption() { (TEST_PRODUCER_C, "TestMessageConsumerC2", 5), ]) .await; + + harness.check_metrics_messages_processed_total(&[ + (TEST_PRODUCER_A, "TestMessageConsumerA", 0), + (TEST_PRODUCER_B, "TestMessageConsumerB", 0), + (TEST_PRODUCER_C, "TestMessageConsumerC1", 5 - 2), + (TEST_PRODUCER_C, "TestMessageConsumerC2", 5 - 4), + ]); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -301,12 +329,14 @@ struct TransactionalOutboxProcessorHarness { outbox_processor: Arc, outbox: Arc, outbox_consumption_repository: Arc, + metrics: Arc, } impl TransactionalOutboxProcessorHarness { fn new() -> Self { let mut b = CatalogBuilder::new(); b.add::(); + b.add::(); b.add_value(OutboxConfig::default()); b.add::(); b.add::(); @@ -332,12 +362,14 @@ impl TransactionalOutboxProcessorHarness { let outbox_consumption_repository = catalog .get_one::() .unwrap(); + let metrics = catalog.get_one().unwrap(); Self { catalog, outbox_processor, outbox, outbox_consumption_repository, + metrics, } } @@ -386,6 +418,21 @@ impl TransactionalOutboxProcessorHarness { boundaries.sort(); boundaries } + + fn check_metrics_messages_processed_total(&self, expected: &[(&str, &str, u64)]) { + for (producer, consumer, expected_value) in expected { + let actual_value = self + .metrics + .messages_processed_total + .get_metric_with_label_values(&[producer, consumer]) + .unwrap() + .get(); + assert_eq!( + *expected_value, actual_value, + "messages_processed_total{{producer={producer},consumer={consumer}}}" + ); + } + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/utils/observability/Cargo.toml b/src/utils/observability/Cargo.toml new file mode 100644 index 0000000000..abeebfc9d0 --- /dev/null +++ b/src/utils/observability/Cargo.toml @@ -0,0 +1,70 @@ +[package] +name = "observability" +description = "Utilities for tracing, structured logging, and metrics" +version = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +authors = { workspace = true } +readme = { workspace = true } +license-file = { workspace = true } +keywords = { workspace = true } +include = { workspace = true } +edition = { workspace = true } +publish = { workspace = true } + + +[lints] +workspace = true + + +[lib] +doctest = false + + +[features] +default = [] + +opentelemetry = [ + "dep:opentelemetry", + "dep:opentelemetry_sdk", + "dep:opentelemetry-otlp", + "dep:opentelemetry-semantic-conventions", + "dep:tracing-opentelemetry", +] + +prometheus = ["dep:prometheus"] + + +[dependencies] +async-trait = { version = "0.1" } +axum = { version = "0.6", default-features = false, features = [ + "json", + "matched-path", + "query", +] } +dill = { version = "0.9", default-features = false } +http = { version = "0.2", default-features = false } +serde = { version = "1", default-features = false, features = ["derive"] } +serde_json = { version = "1", default-features = false } +thiserror = { version = "1", default-features = false } +tracing = { version = "0.1", default-features = false } +tracing-appender = { version = "0.2", default-features = false } +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } +tower = { version = "0.4", default-features = false } +tower-http = { version = "0.4", default-features = false, features = ["trace"] } + +opentelemetry = { optional = true, version = "0.23", default-features = false } +opentelemetry_sdk = { optional = true, version = "0.23", default-features = false, features = [ + "rt-tokio", +] } +opentelemetry-otlp = { optional = true, version = "0.16", default-features = false, features = [ + "trace", + "grpc-tonic", +] } +opentelemetry-semantic-conventions = { optional = true, version = "0.16", default-features = false } +tracing-opentelemetry = { optional = true, version = "0.24", default-features = false } + +prometheus = { optional = true, version = "0.13", default-features = false } + + +[dev-dependencies] diff --git a/src/utils/observability/src/axum.rs b/src/utils/observability/src/axum.rs new file mode 100644 index 0000000000..3ea228bbc2 --- /dev/null +++ b/src/utils/observability/src/axum.rs @@ -0,0 +1,151 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use http::Uri; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub fn http_layer() -> tower_http::trace::TraceLayer< + tower_http::classify::SharedClassifier, + MakeSpan, + OnRequest, + OnResponse, +> { + tower_http::trace::TraceLayer::new_for_http() + .on_request(OnRequest) + .on_response(OnResponse) + .make_span_with(MakeSpan) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Clone, Debug)] +pub struct OnRequest; + +impl tower_http::trace::OnRequest for OnRequest { + fn on_request(&mut self, request: &http::Request, _: &tracing::Span) { + tracing::info!( + uri = %request.uri(), + version = ?request.version(), + headers = ?request.headers(), + "HTTP request", + ); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Clone, Debug)] +pub struct OnResponse; + +impl tower_http::trace::OnResponse for OnResponse { + fn on_response( + self, + response: &http::Response, + latency: std::time::Duration, + _span: &tracing::Span, + ) { + tracing::info!( + status = response.status().as_u16(), + headers = ?response.headers(), + latency = %Latency(latency), + "HTTP response" + ); + } +} + +struct Latency(std::time::Duration); + +impl std::fmt::Display for Latency { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} ms", self.0.as_millis()) + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, Clone)] +pub struct MakeSpan; + +impl tower_http::trace::MakeSpan for MakeSpan { + // TODO: Trace linking across requests + fn make_span(&mut self, request: &http::Request) -> tracing::Span { + let method = request.method(); + let route = RouteOrUri::from(request); + + let span = crate::tracing::root_span!( + "http_request", + %method, + %route, + "otel.name" = tracing::field::Empty, + ); + + #[cfg(feature = "opentelemetry")] + { + crate::tracing::include_otel_trace_id(&span); + + span.record( + "otel.name", + tracing::field::display(SpanName::new(method, route)), + ); + } + + span + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[cfg(feature = "opentelemetry")] +struct SpanName<'a> { + method: &'a http::Method, + route: RouteOrUri<'a>, +} + +#[cfg(feature = "opentelemetry")] +impl<'a> SpanName<'a> { + fn new(method: &'a http::Method, route: RouteOrUri<'a>) -> Self { + Self { method, route } + } +} + +#[cfg(feature = "opentelemetry")] +impl<'a> std::fmt::Display for SpanName<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} {}", self.method, self.route) + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +enum RouteOrUri<'a> { + Route(&'a str), + Uri(&'a Uri), +} + +impl<'a, B> From<&'a http::Request> for RouteOrUri<'a> { + fn from(request: &'a http::Request) -> Self { + request + .extensions() + .get::() + .map_or_else( + || RouteOrUri::Uri(request.uri()), + |m| RouteOrUri::Route(m.as_str()), + ) + } +} + +impl<'a> std::fmt::Display for RouteOrUri<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RouteOrUri::Route(s) => write!(f, "{s}"), + RouteOrUri::Uri(uri) => write!(f, "{uri}"), + } + } +} diff --git a/src/utils/observability/src/config.rs b/src/utils/observability/src/config.rs new file mode 100644 index 0000000000..324c69a008 --- /dev/null +++ b/src/utils/observability/src/config.rs @@ -0,0 +1,84 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug)] +pub struct Config { + /// Name of the service that will appear e.g. in OTEL traces + pub service_name: String, + /// Version of the service that will appear e.g. in OTEL traces + pub service_version: String, + /// Log levels that will be used if `RUST_LOG` was not specified explicitly + pub default_log_levels: String, + /// OpenTelemetry protocol endpoint to export traces to + pub otlp_endpoint: Option, +} + +impl Default for Config { + fn default() -> Self { + Self { + service_name: "unnamed-service".to_string(), + service_version: "0.0.0".to_string(), + default_log_levels: "info".to_string(), + otlp_endpoint: None, + } + } +} + +impl Config { + pub fn from_env() -> Self { + Self::from_env_with_prefix("") + } + + pub fn from_env_with_prefix(prefix: &str) -> Self { + let mut cfg = Self::default(); + if let Some(service_name) = std::env::var(format!("{prefix}SERVICE_NAME")) + .ok() + .filter(|v| !v.is_empty()) + { + cfg.service_name = service_name; + } + if let Some(service_version) = std::env::var(format!("{prefix}SERVICE_VERSION")) + .ok() + .filter(|v| !v.is_empty()) + { + cfg.service_version = service_version; + } + if let Some(otlp_endpoint) = std::env::var(format!("{prefix}OTLP_ENDPOINT")) + .ok() + .filter(|v| !v.is_empty()) + { + cfg.otlp_endpoint = Some(otlp_endpoint); + } + cfg + } + + pub fn with_service_name(mut self, service_name: impl Into) -> Self { + self.service_name = service_name.into(); + self + } + + pub fn with_service_version(mut self, service_version: impl Into) -> Self { + self.service_version = service_version.into(); + self + } + + pub fn with_default_log_levels(mut self, default_log_levels: impl Into) -> Self { + self.default_log_levels = default_log_levels.into(); + self + } + + pub fn with_otlp_endpoint(mut self, otlp_endpoint: impl Into) -> Self { + self.otlp_endpoint = Some(otlp_endpoint.into()); + self + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/utils/observability/src/health.rs b/src/utils/observability/src/health.rs new file mode 100644 index 0000000000..f4929b1831 --- /dev/null +++ b/src/utils/observability/src/health.rs @@ -0,0 +1,87 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +/// Type implementing this trait will be called during the health check +/// procedure. Multiple checks can be added, in which case they will be called +/// one by one with check considered failed upon the first error. +#[async_trait::async_trait] +pub trait HealthCheck: Send + Sync { + async fn check(&self, check_type: CheckType) -> Result<(), CheckError>; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +/// Axum handler for serving the health checks. Depends on [`dill::Catalog`] to +/// instantiate the implementations of the [`HealthCheck`] trait. +pub async fn health_handler( + axum::Extension(catalog): axum::Extension, + axum::extract::Query(args): axum::extract::Query, +) -> Result, CheckError> { + for checker in catalog.get::>().unwrap() { + checker.check(args.r#type).await?; + } + + Ok(axum::Json(CheckSuccess { ok: true })) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CheckSuccess { + pub ok: bool, +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, thiserror::Error)] +#[error("{reason}")] +pub struct CheckError { + pub reason: String, +} + +impl axum::response::IntoResponse for CheckError { + fn into_response(self) -> axum::response::Response { + ( + http::status::StatusCode::SERVICE_UNAVAILABLE, + axum::Json(serde_json::json!({ + "ok": false, + "reason": self.reason, + })), + ) + .into_response() + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Clone, Copy, PartialEq, Eq, Debug, Default, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "lowercase")] +pub enum CheckType { + /// Determines when to restart the container + #[default] + Liveness, + /// Determines when to remove the instance from the loadbalancer + Readiness, + /// Is sent before liveness and readiness checks to determine when the + /// startup procedure is finished procedure is finished + Startup, +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, serde::Deserialize)] +pub struct CheckArgs { + #[serde(default)] + pub r#type: CheckType, +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/utils/observability/src/init.rs b/src/utils/observability/src/init.rs new file mode 100644 index 0000000000..2021c9024b --- /dev/null +++ b/src/utils/observability/src/init.rs @@ -0,0 +1,182 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::io::IsTerminal; + +use tracing_subscriber::fmt::format::FmtSpan; +use tracing_subscriber::layer::SubscriberExt as _; +use tracing_subscriber::util::SubscriberInitExt as _; +use tracing_subscriber::EnvFilter; + +use super::config::Config; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +/// If application is started under a terminal will use the [`dev`] mode, +/// otherwise will use [`service`] mode. +pub fn auto(cfg: Config) -> Guard { + if std::io::stderr().is_terminal() { + dev(cfg) + } else { + service(cfg) + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[allow(clippy::needless_pass_by_value)] +pub fn dev(cfg: Config) -> Guard { + let env_filter = + EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new(cfg.default_log_levels.clone())); + + let text_layer = tracing_subscriber::fmt::layer() + .pretty() + .with_writer(std::io::stderr) + .with_line_number(true) + .with_thread_names(true) + .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE); + + #[cfg(feature = "opentelemetry")] + let (otel_layer, otlp_guard) = if cfg.otlp_endpoint.is_none() { + (None, None) + } else { + ( + Some( + tracing_opentelemetry::layer() + .with_error_records_to_exceptions(true) + .with_tracer(init_otel_tracer(&cfg)), + ), + Some(OtlpGuard), + ) + }; + + #[cfg(feature = "opentelemetry")] + tracing_subscriber::registry() + .with(env_filter) + .with(otel_layer) + .with(text_layer) + .init(); + + #[cfg(not(feature = "opentelemetry"))] + tracing_subscriber::registry() + .with(env_filter) + .with(text_layer) + .init(); + + Guard { + non_blocking_appender: None, + + #[cfg(feature = "opentelemetry")] + otlp_guard, + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[allow(clippy::needless_pass_by_value)] +pub fn service(cfg: Config) -> Guard { + let env_filter = + EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new(cfg.default_log_levels.clone())); + + let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stderr()); + + let text_layer = tracing_subscriber::fmt::layer() + .json() + .with_writer(non_blocking) + .with_line_number(true) + .with_thread_names(true) + .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE); + + #[cfg(feature = "opentelemetry")] + let (otel_layer, otlp_guard) = if cfg.otlp_endpoint.is_none() { + (None, None) + } else { + ( + Some( + tracing_opentelemetry::layer() + .with_error_records_to_exceptions(true) + .with_tracer(init_otel_tracer(&cfg)), + ), + Some(OtlpGuard), + ) + }; + + #[cfg(feature = "opentelemetry")] + tracing_subscriber::registry() + .with(env_filter) + .with(otel_layer) + .with(text_layer) + .init(); + + #[cfg(not(feature = "opentelemetry"))] + tracing_subscriber::registry() + .with(env_filter) + .with(text_layer) + .init(); + + Guard { + non_blocking_appender: Some(guard), + + #[cfg(feature = "opentelemetry")] + otlp_guard, + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[cfg(feature = "opentelemetry")] +fn init_otel_tracer(cfg: &Config) -> opentelemetry_sdk::trace::Tracer { + use std::time::Duration; + + use opentelemetry::KeyValue; + use opentelemetry_otlp::WithExportConfig as _; + use opentelemetry_semantic_conventions::resource as otel_resource; + + let otel_exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(cfg.otlp_endpoint.as_ref().unwrap()) + .with_timeout(Duration::from_secs(5)); + + opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(otel_exporter) + .with_trace_config( + opentelemetry_sdk::trace::config() + .with_max_events_per_span(64) + .with_max_attributes_per_span(16) + .with_resource(opentelemetry_sdk::Resource::new([ + KeyValue::new(otel_resource::SERVICE_NAME, cfg.service_name.clone()), + KeyValue::new(otel_resource::SERVICE_VERSION, cfg.service_version.clone()), + ])) + .with_sampler(opentelemetry_sdk::trace::Sampler::AlwaysOn), + ) + .install_batch(opentelemetry_sdk::runtime::Tokio) + .expect("Creating tracer") +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[must_use] +#[allow(dead_code)] +#[derive(Default)] +pub struct Guard { + pub non_blocking_appender: Option, + + #[cfg(feature = "opentelemetry")] + pub otlp_guard: Option, +} + +pub struct OtlpGuard; + +#[cfg(feature = "opentelemetry")] +impl Drop for OtlpGuard { + fn drop(&mut self) { + opentelemetry::global::shutdown_tracer_provider(); + } +} diff --git a/src/utils/observability/src/lib.rs b/src/utils/observability/src/lib.rs new file mode 100644 index 0000000000..6106ac8dbd --- /dev/null +++ b/src/utils/observability/src/lib.rs @@ -0,0 +1,17 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub mod axum; +pub mod config; +pub mod health; +pub mod init; +pub mod tracing; + +#[cfg(feature = "prometheus")] +pub mod metrics; diff --git a/src/utils/observability/src/metrics.rs b/src/utils/observability/src/metrics.rs new file mode 100644 index 0000000000..d8c8f38527 --- /dev/null +++ b/src/utils/observability/src/metrics.rs @@ -0,0 +1,56 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use prometheus::Encoder as _; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +/// Implementors of this trait will be aked during the startup to register +/// metrics that they provide. +pub trait MetricsProvider: Send + Sync { + /// Called during startup to register the metrics + /// + /// IMPORTANT: Metrics that you register must be static or live in the + /// [`dill::Singleton`] scope. + fn register(&self, reg: &prometheus::Registry) -> prometheus::Result<()>; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +/// Uses catalog to extract all [`MetricsProvider`]s and register all provided +/// metrics in the [`prometheus::Registry`] +pub fn register_all(catalog: &dill::Catalog) -> Arc { + let registry: Arc = catalog + .get_one() + .expect("Prometheus registry is not in the DI catalog"); + + for builder in catalog.builders_for::() { + let metrics_set = builder.get(catalog).unwrap(); + metrics_set.register(®istry).unwrap(); + } + + registry +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[allow(clippy::unused_async)] +pub async fn metrics_handler(axum::Extension(catalog): axum::Extension) -> String { + let reg = catalog.get_one::().unwrap(); + + let mut buf = Vec::new(); + + prometheus::TextEncoder::new() + .encode(®.gather(), &mut buf) + .unwrap(); + + String::from_utf8(buf).unwrap() +} diff --git a/src/utils/observability/src/tracing.rs b/src/utils/observability/src/tracing.rs new file mode 100644 index 0000000000..8483daa20d --- /dev/null +++ b/src/utils/observability/src/tracing.rs @@ -0,0 +1,63 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +#[cfg(feature = "opentelemetry")] +#[macro_export] +macro_rules! root_span { + ($name:expr) => { + $crate::tracing::root_span!($name,) + }; + ($name:expr, $($field:tt)*) => { + { + let span = ::tracing::info_span!( + $name, + trace_id = tracing::field::Empty, + $($field)* + ); + + $crate::tracing::include_otel_trace_id(&span); + + span + } + }; +} + +#[cfg(not(feature = "opentelemetry"))] +#[macro_export] +macro_rules! root_span { + ($name:expr) => { + $crate::tracing::root_span!($name,) + }; + ($name:expr, $($field:tt)*) => { + tracing::info_span!( + $name, + trace_id = tracing::field::Empty, + $($field)* + ) + }; +} + +pub use root_span; + +/// Extracts trace ID from the OTEL layer and adds it to the tracing span to +/// allow cross-linking between logs and traces +#[cfg(feature = "opentelemetry")] +pub fn include_otel_trace_id(span: &tracing::Span) { + use opentelemetry::trace::TraceContextExt as _; + use tracing_opentelemetry::OpenTelemetrySpanExt as _; + + let context = span.context(); + let otel_span = context.span(); + let span_context = otel_span.span_context(); + let trace_id = span_context.trace_id(); + + if span_context.is_valid() { + span.record("trace_id", tracing::field::display(trace_id)); + } +}