diff --git a/Cargo.toml b/Cargo.toml index cf965ad..62841fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,11 +19,29 @@ tonic = { version = "0.13", optional = true } rand = "0.9.0" -opentelemetry = { version = "0.30", default-features = false, features = ["trace", "logs"] } -opentelemetry_sdk = { version = "0.30", default-features = false, features = ["trace", "experimental_metrics_custom_reader", "logs"] } -opentelemetry-otlp = { version = "0.30", default-features = false, features = ["trace", "metrics", "logs"] } +opentelemetry = { version = "0.30", default-features = false, features = [ + "trace", + "logs", +] } +opentelemetry_sdk = { version = "0.30", default-features = false, features = [ + "trace", + "experimental_metrics_custom_reader", + "experimental_trace_batch_span_processor_with_async_runtime", + "experimental_logs_batch_log_processor_with_async_runtime", + "experimental_metrics_periodicreader_with_async_runtime", + "rt-tokio", + "logs", +] } +opentelemetry-otlp = { version = "0.30", default-features = false, features = [ + "trace", + "metrics", + "logs", +] } futures-util = "0.3" +tokio = { version = "1.44.1", default-features = false, features = [ + "rt-multi-thread", +] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-opentelemetry = "0.31" @@ -40,23 +58,36 @@ regex = "1.11.1" async-trait = "0.1.88" futures = { version = "0.3.31", features = ["futures-executor"] } insta = "1.42.1" -opentelemetry_sdk = { version = "0.30", default-features = false, features = ["testing"] } +opentelemetry_sdk = { version = "0.30", default-features = false, features = [ + "testing", +] } regex = "1.11.1" -tokio = {version = "1.44.1", features = ["test-util", "macros", "rt-multi-thread"] } +tokio = { version = "1.44.1", features = [ + "test-util", + "macros", + "rt-multi-thread", +] } ulid = "1.2.0" wiremock = "0.6" tonic-build = "0.13" tonic = { version = "0.13", features = ["transport"] } prost = "0.13" -opentelemetry-proto = { version = "0.30", features = ["tonic", "gen-tonic-messages"] } +opentelemetry-proto = { version = "0.30", features = [ + "tonic", + "gen-tonic-messages", +] } tokio-stream = { version = "0.1", features = ["net"] } # Dependencies for examples axum = { version = "0.8", features = ["macros"] } -axum-tracing-opentelemetry = { version = "0.29", features = ["tracing_level_info"] } +axum-tracing-opentelemetry = { version = "0.29", features = [ + "tracing_level_info", +] } axum-otel-metrics = "0.12.0" actix-web = "4.0" -opentelemetry-instrumentation-actix-web = { version = "0.22", features = ["metrics"] } +opentelemetry-instrumentation-actix-web = { version = "0.22", features = [ + "metrics", +] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" futures-util = "0.3" @@ -65,9 +96,22 @@ tempfile = "3.20.0" [features] default = ["data-dir", "export-http-protobuf"] data-dir = ["dep:serde", "dep:serde_json"] -export-grpc = ["opentelemetry-otlp/grpc-tonic", "opentelemetry-otlp/tls", "dep:http", "dep:tonic"] -export-http-protobuf = ["opentelemetry-otlp/http-proto", "opentelemetry-otlp/reqwest-blocking-client", "opentelemetry-otlp/reqwest-rustls"] -export-http-json = ["opentelemetry-otlp/http-json", "opentelemetry-otlp/reqwest-blocking-client", "opentelemetry-otlp/reqwest-rustls"] +export-grpc = [ + "opentelemetry-otlp/grpc-tonic", + "opentelemetry-otlp/tls", + "dep:http", + "dep:tonic", +] +export-http-protobuf = [ + "opentelemetry-otlp/http-proto", + "opentelemetry-otlp/reqwest-client", + "opentelemetry-otlp/reqwest-rustls", +] +export-http-json = [ + "opentelemetry-otlp/http-json", + "opentelemetry-otlp/reqwest-client", + "opentelemetry-otlp/reqwest-rustls", +] [lints.rust] missing_docs = "warn" diff --git a/src/internal/log_processor_shutdown_hack.rs b/src/internal/log_processor_shutdown_hack.rs new file mode 100644 index 0000000..2f64cf7 --- /dev/null +++ b/src/internal/log_processor_shutdown_hack.rs @@ -0,0 +1,44 @@ +use opentelemetry_sdk::logs::LogProcessor; + +/// Workaround for +/// +/// This wraps the inner exporter and redirects calls to `shutdown_with_timeout` to +/// `shutdown`. +#[derive(Debug)] +pub(crate) struct LogProcessorShutdownHack(T); + +impl LogProcessorShutdownHack { + pub(crate) fn new(inner: T) -> Self { + Self(inner) + } +} + +impl LogProcessor for LogProcessorShutdownHack { + fn emit( + &self, + data: &mut opentelemetry_sdk::logs::SdkLogRecord, + instrumentation: &opentelemetry::InstrumentationScope, + ) { + self.0.emit(data, instrumentation); + } + + fn force_flush(&self) -> opentelemetry_sdk::error::OTelSdkResult { + self.0.force_flush() + } + + fn shutdown_with_timeout( + &self, + _timeout: std::time::Duration, + ) -> opentelemetry_sdk::error::OTelSdkResult { + // Calling `.shutdown` here is the deliberate purpose of the abstraction + self.0.shutdown() + } + + fn shutdown(&self) -> opentelemetry_sdk::error::OTelSdkResult { + self.0.shutdown() + } + + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.0.set_resource(resource); + } +} diff --git a/src/internal/mod.rs b/src/internal/mod.rs index 7e0e06e..cadab4c 100644 --- a/src/internal/mod.rs +++ b/src/internal/mod.rs @@ -1,5 +1,6 @@ pub(crate) mod constants; pub(crate) mod env; pub(crate) mod exporters; +pub(crate) mod log_processor_shutdown_hack; pub(crate) mod logfire_tracer; pub(crate) mod span_data_ext; diff --git a/src/logfire.rs b/src/logfire.rs index 23b2d2c..e37a201 100644 --- a/src/logfire.rs +++ b/src/logfire.rs @@ -1,9 +1,10 @@ use std::{ backtrace::Backtrace, borrow::Cow, + cell::RefCell, collections::HashMap, panic::PanicHookInfo, - sync::{Arc, Once}, + sync::{Arc, Mutex, Once}, time::Duration, }; @@ -11,13 +12,21 @@ use std::{ use std::path::{Path, PathBuf}; use opentelemetry::{ + Context, logs::{LoggerProvider as _, Severity}, trace::TracerProvider, }; use opentelemetry_sdk::{ - logs::{BatchLogProcessor, SdkLoggerProvider}, - metrics::{PeriodicReader, SdkMeterProvider}, - trace::{BatchConfigBuilder, BatchSpanProcessor, SdkTracerProvider}, + logs::{SdkLoggerProvider, log_processor_with_async_runtime::BatchLogProcessor}, + metrics::{ + SdkMeterProvider, exporter::PushMetricExporter, + periodic_reader_with_async_runtime::PeriodicReader, + }, + runtime, + trace::{ + BatchConfigBuilder, SdkTracerProvider, + span_processor_with_async_runtime::BatchSpanProcessor, + }, }; use tracing::{Subscriber, level_filters::LevelFilter, subscriber::DefaultGuard}; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -31,6 +40,7 @@ use crate::{ internal::{ env::get_optional_env, exporters::console::{ConsoleWriter, create_console_processors}, + log_processor_shutdown_hack::LogProcessorShutdownHack, logfire_tracer::{GLOBAL_TRACER, LOCAL_TRACER, LogfireTracer}, }, ulid_id_generator::UlidIdGenerator, @@ -49,6 +59,7 @@ pub struct Logfire { pub(crate) meter_provider: SdkMeterProvider, pub(crate) logger_provider: SdkLoggerProvider, pub(crate) enable_tracing_metrics: bool, + pub(crate) shutdown_sender: Arc>>>, } impl Logfire { @@ -90,9 +101,20 @@ impl Logfire { /// /// See [`ConfigureError`] for possible errors. pub fn shutdown(&self) -> Result<(), ShutdownError> { + // shutdown produces some logs, we don't care about these + let _guard = Context::enter_telemetry_suppressed_scope(); + self.tracer_provider.shutdown()?; self.meter_provider.shutdown()?; self.logger_provider.shutdown()?; + + // Send shutdown signal to the background runtime thread + if let Ok(mut sender_guard) = self.shutdown_sender.lock() { + if let Some(sender) = sender_guard.take() { + let _ = sender.send(()); + } + } + Ok(()) } @@ -146,10 +168,15 @@ impl Logfire { meter_provider, logger_provider, enable_tracing_metrics, + shutdown_sender, .. } = Self::build_parts(config, None)?; if !local { + // avoid otel logs firing as these messages are sent regarding "global meter provider" + // being set + let _guard = Context::enter_telemetry_suppressed_scope(); + tracing::subscriber::set_global_default(subscriber.clone())?; let logger = crate::bridges::log::LogfireLogger::init(tracer.clone()); log::set_logger(logger)?; @@ -176,6 +203,7 @@ impl Logfire { meter_provider, logger_provider, enable_tracing_metrics, + shutdown_sender, }) } @@ -264,6 +292,7 @@ impl Logfire { let mut tracer_provider_builder = SdkTracerProvider::builder(); let mut logger_provider_builder = SdkLoggerProvider::builder(); + let mut meter_provider_builder = SdkMeterProvider::builder(); if let Some(id_generator) = advanced_options.id_generator { tracer_provider_builder = tracer_provider_builder.with_id_generator(id_generator); @@ -297,14 +326,13 @@ impl Logfire { if has_service_attributes { let service_resource = service_resource_builder.build(); - tracer_provider_builder = - tracer_provider_builder.with_resource(service_resource.clone()); - // Add it to the list for other components too advanced_options.resources.push(service_resource); } - for resource in &advanced_options.resources { + for resource in advanced_options.resources { tracer_provider_builder = tracer_provider_builder.with_resource(resource.clone()); + logger_provider_builder = logger_provider_builder.with_resource(resource.clone()); + meter_provider_builder = meter_provider_builder.with_resource(resource); } let mut http_headers: Option> = None; @@ -328,20 +356,24 @@ impl Logfire { None }; - if let Some(logfire_base_url) = logfire_base_url { - tracer_provider_builder = tracer_provider_builder.with_span_processor( - BatchSpanProcessor::builder(crate::exporters::span_exporter( + let shutdown_sender = if let Some(logfire_base_url) = logfire_base_url { + let (shutdown_tx, span_processor, log_processor, metrics_processor) = + spawn_runtime_and_exporters( logfire_base_url, - http_headers.clone(), - )?) - .with_batch_config( - BatchConfigBuilder::default() - .with_scheduled_delay(Duration::from_millis(500)) // 500 matches Python - .build(), - ) - .build(), - ); - } + http_headers, + config.metrics.is_some(), + )?; + + tracer_provider_builder = tracer_provider_builder.with_span_processor(span_processor); + logger_provider_builder = logger_provider_builder.with_log_processor(log_processor); + if let Some(metrics_processor) = metrics_processor { + meter_provider_builder = meter_provider_builder.with_reader(metrics_processor); + } + + Arc::new(Mutex::new(Some(shutdown_tx))) + } else { + Arc::new(Mutex::new(None)) + }; let console_processors = config .console_options @@ -357,55 +389,21 @@ impl Logfire { } let tracer_provider = tracer_provider_builder.build(); - let tracer = tracer_provider.tracer("logfire"); - let mut meter_provider_builder = SdkMeterProvider::builder(); - - if let Some(logfire_base_url) = logfire_base_url { - if config.metrics.is_some() { - let metric_reader = PeriodicReader::builder(crate::exporters::metric_exporter( - logfire_base_url, - http_headers.clone(), - )?) - .build(); - - meter_provider_builder = meter_provider_builder.with_reader(metric_reader); - } - } - if let Some(metrics) = config.metrics { for reader in metrics.additional_readers { meter_provider_builder = meter_provider_builder.with_reader(reader); } } - for resource in &advanced_options.resources { - meter_provider_builder = meter_provider_builder.with_resource(resource.clone()); - } - let meter_provider = meter_provider_builder.build(); - if let Some(logfire_base_url) = logfire_base_url { - logger_provider_builder = logger_provider_builder.with_log_processor( - BatchLogProcessor::builder(crate::exporters::log_exporter( - logfire_base_url, - http_headers.clone(), - )?) - .build(), - ); - } - for log_processor in advanced_options.log_record_processors { logger_provider_builder = logger_provider_builder.with_log_processor(log_processor); } - for resource in advanced_options.resources { - logger_provider_builder = logger_provider_builder.with_resource(resource); - } - let logger_provider = logger_provider_builder.build(); - let logger = Arc::new(logger_provider.logger("logfire")); let default_level_filter = config.default_level_filter.unwrap_or(if send_to_logfire { @@ -457,6 +455,7 @@ impl Logfire { meter_provider, logger_provider, enable_tracing_metrics: advanced_options.enable_tracing_metrics, + shutdown_sender, #[cfg(test)] metadata: TestMetadata { send_to_logfire, @@ -514,6 +513,7 @@ struct LogfireParts { meter_provider: SdkMeterProvider, logger_provider: SdkLoggerProvider, enable_tracing_metrics: bool, + shutdown_sender: Arc>>>, #[cfg(test)] metadata: TestMetadata, } @@ -635,6 +635,120 @@ struct LogfireCredentials { logfire_api_url: String, } +/// Spawns a tokio runtime in a background thread and creates exporters which will use +/// that runtime. +/// +/// As per , using a +/// runtime with suppression is the recommended way to avoid the export process +/// generating telemetry logs (and creating both noise and an infinite cycle). +/// +/// It also has the benefit for the `grpc` export that the tonic channel can be created +/// successfully in all cases; it _needs_ a tokio runtime and this way we don't need +/// the user to call `logfire::configure` inside an async context. +#[allow(clippy::type_complexity)] // internal type, not exposed to users +fn spawn_runtime_and_exporters( + logfire_base_url: &str, + http_headers: Option>, + enable_metrics: bool, +) -> Result< + ( + tokio::sync::oneshot::Sender<()>, + BatchSpanProcessor, + LogProcessorShutdownHack>, + Option>>, + ), + ConfigureError, +> { + thread_local! { + static SUPPRESS_GUARD: RefCell> = const { RefCell::new(None) }; + } + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .worker_threads(1) + .on_thread_start(|| { + let suppress_guard = Context::enter_telemetry_suppressed_scope(); + SUPPRESS_GUARD.with(|guard| { + *guard.borrow_mut() = Some(suppress_guard); + }); + }) + .on_thread_stop(|| { + SUPPRESS_GUARD.with(|guard| { + if let Some(suppress_guard) = guard.borrow_mut().take() { + drop(suppress_guard); + } + }); + }) + .thread_name("logfire-export-runtime") + .build() + .map_err(|e| ConfigureError::Other(e.into()))?; + + let handle = rt.handle().clone(); + + // Create oneshot channel for shutdown signaling + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + + // Spawn the runtime into a background thread + std::thread::Builder::new() + .name("logfire-export-runtime".into()) + .spawn(move || { + let _guard = Context::enter_telemetry_suppressed_scope(); + let _ = rt.block_on(shutdown_rx); + }) + .map_err(|e| { + ConfigureError::Other(format!("failed to create logfire exporter: {e:?}").into()) + })?; + + let (span_processor, log_processor, metrics_processor) = std::thread::scope(|s| { + s.spawn(|| -> Result<_, ConfigureError> { + // all these processors spawn tasks on the runtime when they are created. + let _guard = handle.enter(); + + let span_processor = BatchSpanProcessor::builder( + crate::exporters::span_exporter(logfire_base_url, http_headers.clone())?, + runtime::Tokio, + ) + .with_batch_config( + BatchConfigBuilder::default() + .with_scheduled_delay(Duration::from_millis(500)) // 500 matches Python + .build(), + ) + .build(); + + let log_processor = LogProcessorShutdownHack::new( + BatchLogProcessor::builder( + crate::exporters::log_exporter(logfire_base_url, http_headers.clone())?, + runtime::Tokio, + ) + .build(), + ); + + let metrics_processor = if enable_metrics { + Some( + PeriodicReader::builder( + crate::exporters::metric_exporter(logfire_base_url, http_headers)?, + runtime::Tokio, + ) + .build(), + ) + } else { + None + }; + + Ok((span_processor, log_processor, metrics_processor)) + }) + .join() + .map_err(|_| ConfigureError::Other("failed to create logfire processors".into()))? + })?; + + Ok(( + shutdown_tx, + span_processor, + log_processor, + metrics_processor, + )) +} + #[cfg(test)] mod tests { use std::{ diff --git a/src/test_utils.rs b/src/test_utils.rs index 4e3dea2..a3d0a58 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -16,6 +16,7 @@ use opentelemetry::{ logs::{LogRecord, Logger, LoggerProvider as _}, trace::{SpanId, TraceId}, }; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_sdk::{ Resource, @@ -461,3 +462,43 @@ pub fn make_trace_request_deterministic(req: &mut ExportTraceServiceRequest) { } } } + +pub fn make_log_request_deterministic(req: &mut ExportLogsServiceRequest) { + let mut timestamp_remap = TimestampRemapper::new(); + + for resource_log in &mut req.resource_logs { + if let Some(resource) = &mut resource_log.resource { + resource.attributes.sort_by_key(|attr| attr.key.clone()); + } + + for scope_log in &mut resource_log.scope_logs { + if let Some(scope) = &mut scope_log.scope { + scope.attributes.sort_by_key(|attr| attr.key.clone()); + } + + for log_record in &mut scope_log.log_records { + // Remap timestamps + log_record.time_unix_nano = + timestamp_remap.remap_u64_nano_timestamp(log_record.time_unix_nano); + log_record.observed_time_unix_nano = + timestamp_remap.remap_u64_nano_timestamp(log_record.observed_time_unix_nano); + + // Zero out non-deterministic attributes + for attr in &mut log_record.attributes { + if attr.key == "thread.id" { + attr.value = Some(opentelemetry_proto::tonic::common::v1::AnyValue { + value: Some( + opentelemetry_proto::tonic::common::v1::any_value::Value::IntValue( + 0, + ), + ), + }); + } + } + + // Sort attributes by key + log_record.attributes.sort_by_key(|attr| attr.key.clone()); + } + } + } +} diff --git a/tests/test_grpc_sink.rs b/tests/test_grpc_sink.rs index 7126bd9..6cc02eb 100644 --- a/tests/test_grpc_sink.rs +++ b/tests/test_grpc_sink.rs @@ -1,8 +1,14 @@ //! Integration tests for gRPC sink with mock server. #![cfg(feature = "export-grpc")] +use futures::channel::oneshot; use insta::assert_debug_snapshot; -use logfire::{configure, set_local_logfire, span}; +use logfire::{configure, info, span}; +use opentelemetry::Context; +use opentelemetry_proto::tonic::collector::logs::v1::{ + ExportLogsServiceRequest, ExportLogsServiceResponse, + logs_service_server::{LogsService, LogsServiceServer}, +}; use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTraceServiceRequest, ExportTraceServiceResponse, trace_service_server::{TraceService, TraceServiceServer}, @@ -11,7 +17,9 @@ use std::sync::{Arc, Mutex}; use tokio::net::TcpListener; use tonic::{Request, Response, Status, transport::Server}; -use crate::test_utils::{DeterministicIdGenerator, make_trace_request_deterministic}; +use crate::test_utils::{ + DeterministicIdGenerator, make_log_request_deterministic, make_trace_request_deterministic, +}; #[path = "../src/test_utils.rs"] mod test_utils; @@ -28,6 +36,18 @@ impl MockTraceService { } } +/// Mock logs service that captures requests for testing +#[derive(Clone)] +struct MockLogsService { + captured: Arc>>, +} + +impl MockLogsService { + fn new(captured: Arc>>) -> Self { + Self { captured } + } +} + #[tonic::async_trait] impl TraceService for MockTraceService { async fn export( @@ -48,34 +68,89 @@ impl TraceService for MockTraceService { } } +#[tonic::async_trait] +impl LogsService for MockLogsService { + async fn export( + &self, + request: Request, + ) -> Result, Status> { + let logs_request = request.into_inner(); + + // Capture the request for testing + { + let mut captured = self.captured.lock().unwrap(); + captured.push(logs_request); + } + + Ok(Response::new(ExportLogsServiceResponse { + partial_success: None, + })) + } +} + /// Start a mock gRPC server and return its address -async fn start_mock_grpc_server() -> (String, Arc>>) { - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - let addr_str = format!("http://{}", addr); - - let captured = Arc::new(Mutex::new(Vec::new())); - let captured_clone = captured.clone(); - - let trace_service = MockTraceService::new(captured_clone); - - tokio::spawn(async move { - Server::builder() - .add_service(TraceServiceServer::new(trace_service)) - .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) - .await - .expect("gRPC server failed to start"); - }); +async fn start_mock_grpc_server() -> ( + oneshot::Sender<()>, + String, + Arc>>, + Arc>>, +) { + let (ready_tx, ready_rx) = oneshot::channel(); - let channel = tonic::transport::Channel::from_shared(addr_str.clone()) - .unwrap() - .connect() - .await - .expect("Failed to connect to mock gRPC server"); + // run the server in a separate thread with context suppression + std::thread::spawn(move || { + let _guard = Context::enter_telemetry_suppressed_scope(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create tokio runtime"); + + rt.block_on(async move { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let addr_str = format!("http://{}", addr); - tonic::client::Grpc::new(channel).ready().await.unwrap(); + let captured_traces = Arc::new(Mutex::new(Vec::new())); + let captured_traces_clone = captured_traces.clone(); - (addr_str, captured) + let captured_logs = Arc::new(Mutex::new(Vec::new())); + let captured_logs_clone = captured_logs.clone(); + + let trace_service = MockTraceService::new(captured_traces_clone); + let logs_service = MockLogsService::new(captured_logs_clone); + + tokio::spawn(async move { + Server::builder() + .add_service(TraceServiceServer::new(trace_service)) + .add_service(LogsServiceServer::new(logs_service)) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) + .await + .expect("gRPC server failed to start"); + }); + + let channel = tonic::transport::Channel::from_shared(addr_str.clone()) + .unwrap() + .connect() + .await + .expect("Failed to connect to mock gRPC server"); + + tonic::client::Grpc::new(channel).ready().await.unwrap(); + + ready_tx + .send((shutdown_tx, addr_str, captured_traces, captured_logs)) + .unwrap(); + + tokio::select! { + _ = shutdown_rx => { + // Shutdown signal received + } + } + }); + }); + + ready_rx.await.unwrap() } /// Test gRPC protobuf export infrastructure @@ -83,7 +158,7 @@ async fn start_mock_grpc_server() -> (String, Arc