From 9e8f98a9fbdaba3ea967bd83a47a079aa8b465a7 Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Wed, 10 Dec 2025 14:55:35 -0800 Subject: [PATCH] [codex] add otel tracing --- codex-rs/Cargo.lock | 31 +- codex-rs/Cargo.toml | 1 + codex-rs/app-server/Cargo.toml | 1 - codex-rs/app-server/src/lib.rs | 13 +- codex-rs/codex-api/src/endpoint/responses.rs | 2 + codex-rs/codex-client/Cargo.toml | 6 + codex-rs/codex-client/src/default_client.rs | 84 +++- codex-rs/core/Cargo.toml | 3 +- codex-rs/core/src/client.rs | 41 +- codex-rs/core/src/codex.rs | 125 +++-- codex-rs/core/src/config/mod.rs | 2 + codex-rs/core/src/config/types.rs | 7 +- codex-rs/core/src/default_client.rs | 5 +- codex-rs/core/src/mcp_connection_manager.rs | 2 + codex-rs/core/src/otel_init.rs | 6 +- codex-rs/core/src/state/service.rs | 4 +- codex-rs/core/src/stream_events_utils.rs | 20 +- codex-rs/core/src/tasks/regular.rs | 13 +- codex-rs/core/src/tools/context.rs | 2 +- codex-rs/core/src/tools/orchestrator.rs | 6 +- codex-rs/core/src/tools/parallel.rs | 15 + codex-rs/core/src/tools/registry.rs | 2 +- codex-rs/core/src/tools/router.rs | 10 +- .../core/tests/chat_completions_payload.rs | 10 +- codex-rs/core/tests/chat_completions_sse.rs | 10 +- codex-rs/core/tests/responses_headers.rs | 32 +- codex-rs/core/tests/suite/client.rs | 9 +- codex-rs/core/tests/suite/otel.rs | 159 ++++++- codex-rs/exec/Cargo.toml | 1 - codex-rs/exec/src/lib.rs | 20 +- codex-rs/otel/Cargo.toml | 23 +- codex-rs/otel/src/config.rs | 1 + codex-rs/otel/src/lib.rs | 24 +- ...{otel_event_manager.rs => otel_manager.rs} | 106 ++++- codex-rs/otel/src/otel_provider.rs | 433 ++++++++++++++---- codex-rs/tui/Cargo.toml | 1 - codex-rs/tui/src/lib.rs | 27 +- codex-rs/tui2/Cargo.toml | 1 - codex-rs/tui2/src/lib.rs | 25 +- 39 files changed, 963 insertions(+), 320 deletions(-) rename codex-rs/otel/src/{otel_event_manager.rs => otel_manager.rs} (82%) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index a949ce94e9e..43faba44254 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -840,7 +840,6 @@ dependencies = [ "codex-utils-json-to-toml", "core_test_support", "mcp-types", - "opentelemetry-appender-tracing", "os_info", "pretty_assertions", "serde", @@ -1015,6 +1014,8 @@ dependencies = [ "eventsource-stream", "futures", "http", + "opentelemetry", + "opentelemetry_sdk", "rand 0.9.2", "reqwest", "serde", @@ -1022,6 +1023,8 @@ dependencies = [ "thiserror 2.0.17", "tokio", "tracing", + "tracing-opentelemetry", + "tracing-subscriber", ] [[package]] @@ -1160,6 +1163,7 @@ dependencies = [ "toml 0.9.5", "toml_edit", "tracing", + "tracing-subscriber", "tracing-test", "tree-sitter", "tree-sitter-bash", @@ -1186,7 +1190,6 @@ dependencies = [ "core_test_support", "libc", "mcp-types", - "opentelemetry-appender-tracing", "owo-colors", "predicates", "pretty_assertions", @@ -1415,12 +1418,14 @@ name = "codex-otel" version = "0.0.0" dependencies = [ "chrono", + "codex-api", "codex-app-server-protocol", "codex-protocol", "codex-utils-absolute-path", "eventsource-stream", "http", "opentelemetry", + "opentelemetry-appender-tracing", "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk", @@ -1431,6 +1436,8 @@ dependencies = [ "tokio", "tonic", "tracing", + "tracing-opentelemetry", + "tracing-subscriber", ] [[package]] @@ -1560,7 +1567,6 @@ dependencies = [ "lazy_static", "libc", "mcp-types", - "opentelemetry-appender-tracing", "pathdiff", "pretty_assertions", "pulldown-cmark", @@ -1630,7 +1636,6 @@ dependencies = [ "lazy_static", "libc", "mcp-types", - "opentelemetry-appender-tracing", "pathdiff", "pretty_assertions", "pulldown-cmark", @@ -6897,6 +6902,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-subscriber" version = "0.3.20" diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index cdf55434fe3..9b8c2b192d9 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -162,6 +162,7 @@ opentelemetry-appender-tracing = "0.30.0" opentelemetry-otlp = "0.30.0" opentelemetry-semantic-conventions = "0.30.0" opentelemetry_sdk = "0.30.0" +tracing-opentelemetry = "0.31.0" os_info = "3.12.0" owo-colors = "4.2.0" path-absolutize = "3.1.1" diff --git a/codex-rs/app-server/Cargo.toml b/codex-rs/app-server/Cargo.toml index cb3f51bb821..3695ffa1f10 100644 --- a/codex-rs/app-server/Cargo.toml +++ b/codex-rs/app-server/Cargo.toml @@ -43,7 +43,6 @@ tokio = { workspace = true, features = [ ] } tracing = { workspace = true, features = ["log"] } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } -opentelemetry-appender-tracing = { workspace = true } uuid = { workspace = true, features = ["serde", "v7"] } [dev-dependencies] diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 2aee5bc93e2..66d137fba47 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -3,7 +3,6 @@ use codex_common::CliConfigOverrides; use codex_core::config::Config; use codex_core::config::ConfigOverrides; -use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use std::io::ErrorKind; use std::io::Result as IoResult; use std::path::PathBuf; @@ -103,6 +102,7 @@ pub async fn run_main( // control the log level with `RUST_LOG`. let stderr_fmt = tracing_subscriber::fmt::layer() .with_writer(std::io::stderr) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL) .with_filter(EnvFilter::from_default_env()); let feedback_layer = tracing_subscriber::fmt::layer() @@ -111,14 +111,15 @@ pub async fn run_main( .with_target(false) .with_filter(Targets::new().with_default(Level::TRACE)); + let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer()); + + let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer()); + let _ = tracing_subscriber::registry() .with(stderr_fmt) .with(feedback_layer) - .with(otel.as_ref().map(|provider| { - OpenTelemetryTracingBridge::new(&provider.logger).with_filter( - tracing_subscriber::filter::filter_fn(codex_core::otel_init::codex_export_filter), - ) - })) + .with(otel_logger_layer) + .with(otel_tracing_layer) .try_init(); // Task: process incoming messages. diff --git a/codex-rs/codex-api/src/endpoint/responses.rs b/codex-rs/codex-api/src/endpoint/responses.rs index d3a314d7633..a300b5a70d3 100644 --- a/codex-rs/codex-api/src/endpoint/responses.rs +++ b/codex-rs/codex-api/src/endpoint/responses.rs @@ -17,6 +17,7 @@ use codex_protocol::protocol::SessionSource; use http::HeaderMap; use serde_json::Value; use std::sync::Arc; +use tracing::instrument; pub struct ResponsesClient { streaming: StreamingClient, @@ -57,6 +58,7 @@ impl ResponsesClient { self.stream(request.body, request.headers).await } + #[instrument(skip_all, err)] pub async fn stream_prompt( &self, model: &str, diff --git a/codex-rs/codex-client/Cargo.toml b/codex-rs/codex-client/Cargo.toml index d1ff23ef36a..2eeb4569372 100644 --- a/codex-rs/codex-client/Cargo.toml +++ b/codex-rs/codex-client/Cargo.toml @@ -10,6 +10,7 @@ bytes = { workspace = true } eventsource-stream = { workspace = true } futures = { workspace = true } http = { workspace = true } +opentelemetry = { workspace = true } rand = { workspace = true } reqwest = { workspace = true, features = ["json", "stream"] } serde = { workspace = true, features = ["derive"] } @@ -17,6 +18,11 @@ serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["macros", "rt", "time", "sync"] } tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } [lints] workspace = true + +[dev-dependencies] +opentelemetry_sdk = { workspace = true } +tracing-subscriber = { workspace = true } diff --git a/codex-rs/codex-client/src/default_client.rs b/codex-rs/codex-client/src/default_client.rs index 8a25846385a..e79f873cbbd 100644 --- a/codex-rs/codex-client/src/default_client.rs +++ b/codex-rs/codex-client/src/default_client.rs @@ -1,4 +1,6 @@ use http::Error as HttpError; +use opentelemetry::global; +use opentelemetry::propagation::Injector; use reqwest::IntoUrl; use reqwest::Method; use reqwest::Response; @@ -9,6 +11,8 @@ use serde::Serialize; use std::collections::HashMap; use std::fmt::Display; use std::time::Duration; +use tracing::Span; +use tracing_opentelemetry::OpenTelemetrySpanExt; #[derive(Clone, Debug)] pub struct CodexHttpClient { @@ -101,7 +105,9 @@ impl CodexRequestBuilder { } pub async fn send(self) -> Result { - match self.builder.send().await { + let headers = trace_headers(); + + match self.builder.headers(headers).send().await { Ok(response) => { let request_ids = Self::extract_request_ids(&response); tracing::debug!( @@ -141,3 +147,79 @@ impl CodexRequestBuilder { .collect() } } + +struct HeaderMapInjector<'a>(&'a mut HeaderMap); + +impl<'a> Injector for HeaderMapInjector<'a> { + fn set(&mut self, key: &str, value: String) { + if let (Ok(name), Ok(val)) = ( + HeaderName::from_bytes(key.as_bytes()), + HeaderValue::from_str(&value), + ) { + self.0.insert(name, val); + } + } +} + +fn trace_headers() -> HeaderMap { + let mut headers = HeaderMap::new(); + global::get_text_map_propagator(|prop| { + prop.inject_context( + &Span::current().context(), + &mut HeaderMapInjector(&mut headers), + ); + }); + headers +} + +#[cfg(test)] +mod tests { + use super::*; + use opentelemetry::propagation::Extractor; + use opentelemetry::propagation::TextMapPropagator; + use opentelemetry::trace::TraceContextExt; + use opentelemetry::trace::TracerProvider; + use opentelemetry_sdk::propagation::TraceContextPropagator; + use opentelemetry_sdk::trace::SdkTracerProvider; + use tracing::info_span; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::util::SubscriberInitExt; + + #[test] + fn inject_trace_headers_uses_current_span_context() { + global::set_text_map_propagator(TraceContextPropagator::new()); + + let provider = SdkTracerProvider::builder().build(); + let tracer = provider.tracer("test-tracer"); + let subscriber = + tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer)); + let _guard = subscriber.set_default(); + + let span = info_span!("client_request"); + let _entered = span.enter(); + let span_context = span.context().span().span_context().clone(); + + let headers = trace_headers(); + + let extractor = HeaderMapExtractor(&headers); + let extracted = TraceContextPropagator::new().extract(&extractor); + let extracted_span = extracted.span(); + let extracted_context = extracted_span.span_context(); + + assert!(extracted_context.is_valid()); + assert_eq!(extracted_context.trace_id(), span_context.trace_id()); + assert_eq!(extracted_context.span_id(), span_context.span_id()); + } + + struct HeaderMapExtractor<'a>(&'a HeaderMap); + + impl<'a> Extractor for HeaderMapExtractor<'a> { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|value| value.to_str().ok()) + } + + fn keys(&self) -> Vec<&str> { + self.0.keys().map(HeaderName::as_str).collect() + } + } +} diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index 4c231e4dda5..0ba8e1fd3b2 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -28,7 +28,7 @@ codex-execpolicy = { workspace = true } codex-file-search = { workspace = true } codex-git = { workspace = true } codex-keyring-store = { workspace = true } -codex-otel = { workspace = true, features = ["otel"] } +codex-otel = { workspace = true } codex-protocol = { workspace = true } codex-rmcp-client = { workspace = true } codex-utils-absolute-path = { workspace = true } @@ -132,6 +132,7 @@ pretty_assertions = { workspace = true } serial_test = { workspace = true } tempfile = { workspace = true } tokio-test = { workspace = true } +tracing-subscriber = { workspace = true } tracing-test = { workspace = true, features = ["no-env-filter"] } walkdir = { workspace = true } wiremock = { workspace = true } diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 9659b109093..8ab56019c60 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -18,7 +18,7 @@ use codex_api::common::Reasoning; use codex_api::create_text_param_for_request; use codex_api::error::ApiError; use codex_app_server_protocol::AuthMode; -use codex_otel::otel_event_manager::OtelEventManager; +use codex_otel::otel_manager::OtelManager; use codex_protocol::ConversationId; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::models::ResponseItem; @@ -57,7 +57,7 @@ pub struct ModelClient { config: Arc, auth_manager: Option>, model_family: ModelFamily, - otel_event_manager: OtelEventManager, + otel_manager: OtelManager, provider: ModelProviderInfo, conversation_id: ConversationId, effort: Option, @@ -71,7 +71,7 @@ impl ModelClient { config: Arc, auth_manager: Option>, model_family: ModelFamily, - otel_event_manager: OtelEventManager, + otel_manager: OtelManager, provider: ModelProviderInfo, effort: Option, summary: ReasoningSummaryConfig, @@ -82,7 +82,7 @@ impl ModelClient { config, auth_manager, model_family, - otel_event_manager, + otel_manager, provider, conversation_id, effort, @@ -121,12 +121,12 @@ impl ModelClient { if self.config.show_raw_agent_reasoning { Ok(map_response_stream( api_stream.streaming_mode(), - self.otel_event_manager.clone(), + self.otel_manager.clone(), )) } else { Ok(map_response_stream( api_stream.aggregate(), - self.otel_event_manager.clone(), + self.otel_manager.clone(), )) } } @@ -195,7 +195,7 @@ impl ModelClient { warn!(path, "Streaming from fixture"); let stream = codex_api::stream_from_fixture(path, self.provider.stream_idle_timeout()) .map_err(map_api_error)?; - return Ok(map_response_stream(stream, self.otel_event_manager.clone())); + return Ok(map_response_stream(stream, self.otel_manager.clone())); } let auth_manager = self.auth_manager.clone(); @@ -269,7 +269,7 @@ impl ModelClient { match stream_result { Ok(stream) => { - return Ok(map_response_stream(stream, self.otel_event_manager.clone())); + return Ok(map_response_stream(stream, self.otel_manager.clone())); } Err(ApiError::Transport(TransportError::Http { status, .. })) if status == StatusCode::UNAUTHORIZED => @@ -286,8 +286,8 @@ impl ModelClient { self.provider.clone() } - pub fn get_otel_event_manager(&self) -> OtelEventManager { - self.otel_event_manager.clone() + pub fn get_otel_manager(&self) -> OtelManager { + self.otel_manager.clone() } pub fn get_session_source(&self) -> SessionSource { @@ -371,7 +371,7 @@ impl ModelClient { impl ModelClient { /// Builds request and SSE telemetry for streaming API calls (Chat/Responses). fn build_streaming_telemetry(&self) -> (Arc, Arc) { - let telemetry = Arc::new(ApiTelemetry::new(self.otel_event_manager.clone())); + let telemetry = Arc::new(ApiTelemetry::new(self.otel_manager.clone())); let request_telemetry: Arc = telemetry.clone(); let sse_telemetry: Arc = telemetry; (request_telemetry, sse_telemetry) @@ -379,7 +379,7 @@ impl ModelClient { /// Builds request telemetry for unary API calls (e.g., Compact endpoint). fn build_request_telemetry(&self) -> Arc { - let telemetry = Arc::new(ApiTelemetry::new(self.otel_event_manager.clone())); + let telemetry = Arc::new(ApiTelemetry::new(self.otel_manager.clone())); let request_telemetry: Arc = telemetry; request_telemetry } @@ -396,7 +396,7 @@ fn build_api_prompt(prompt: &Prompt, instructions: String, tools_json: Vec(api_stream: S, otel_event_manager: OtelEventManager) -> ResponseStream +fn map_response_stream(api_stream: S, otel_manager: OtelManager) -> ResponseStream where S: futures::Stream> + Unpin @@ -404,7 +404,6 @@ where + 'static, { let (tx_event, rx_event) = mpsc::channel::>(1600); - let manager = otel_event_manager; tokio::spawn(async move { let mut logged_error = false; @@ -416,7 +415,7 @@ where token_usage, }) => { if let Some(usage) = &token_usage { - manager.sse_event_completed( + otel_manager.sse_event_completed( usage.input_tokens, usage.output_tokens, Some(usage.cached_input_tokens), @@ -443,7 +442,7 @@ where Err(err) => { let mapped = map_api_error(err); if !logged_error { - manager.see_event_completed_failed(&mapped); + otel_manager.see_event_completed_failed(&mapped); logged_error = true; } if tx_event.send(Err(mapped)).await.is_err() { @@ -497,12 +496,12 @@ fn map_unauthorized_status(status: StatusCode) -> CodexErr { } struct ApiTelemetry { - otel_event_manager: OtelEventManager, + otel_manager: OtelManager, } impl ApiTelemetry { - fn new(otel_event_manager: OtelEventManager) -> Self { - Self { otel_event_manager } + fn new(otel_manager: OtelManager) -> Self { + Self { otel_manager } } } @@ -515,7 +514,7 @@ impl RequestTelemetry for ApiTelemetry { duration: Duration, ) { let error_message = error.map(std::string::ToString::to_string); - self.otel_event_manager.record_api_request( + self.otel_manager.record_api_request( attempt, status.map(|s| s.as_u16()), error_message.as_deref(), @@ -533,6 +532,6 @@ impl SseTelemetry for ApiTelemetry { >, duration: Duration, ) { - self.otel_event_manager.log_sse_event(result, duration); + self.otel_manager.log_sse_event(result, duration); } } diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index f8f64ac9e9d..b74338243a0 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -61,9 +61,13 @@ use tokio::sync::Mutex; use tokio::sync::RwLock; use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; +use tracing::Instrument; use tracing::debug; use tracing::error; +use tracing::field; use tracing::info; +use tracing::info_span; +use tracing::instrument; use tracing::warn; use crate::ModelProviderInfo; @@ -141,7 +145,7 @@ use crate::user_notification::UserNotification; use crate::util::backoff; use codex_async_utils::OrCancelExt; use codex_execpolicy::Policy as ExecPolicy; -use codex_otel::otel_event_manager::OtelEventManager; +use codex_otel::otel_manager::OtelManager; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; @@ -479,7 +483,7 @@ impl Session { #[allow(clippy::too_many_arguments)] fn make_turn_context( auth_manager: Option>, - otel_event_manager: &OtelEventManager, + otel_manager: &OtelManager, provider: ModelProviderInfo, session_configuration: &SessionConfiguration, per_turn_config: Config, @@ -487,7 +491,7 @@ impl Session { conversation_id: ConversationId, sub_id: String, ) -> TurnContext { - let otel_event_manager = otel_event_manager.clone().with_model( + let otel_manager = otel_manager.clone().with_model( session_configuration.model.as_str(), model_family.get_model_slug(), ); @@ -497,7 +501,7 @@ impl Session { per_turn_config.clone(), auth_manager, model_family.clone(), - otel_event_manager, + otel_manager, provider, session_configuration.model_reasoning_effort, session_configuration.model_reasoning_summary, @@ -616,7 +620,7 @@ impl Session { maybe_push_chat_wire_api_deprecation(&config, &mut post_session_configured_events); // todo(aibrahim): why are we passing model here while it can change? - let otel_event_manager = OtelEventManager::new( + let otel_manager = OtelManager::new( conversation_id, session_configuration.model.as_str(), session_configuration.model.as_str(), @@ -625,9 +629,10 @@ impl Session { auth_manager.auth().map(|a| a.mode), config.otel.log_user_prompt, terminal::user_agent(), + session_configuration.session_source.clone(), ); - otel_event_manager.conversation_starts( + otel_manager.conversation_starts( config.model_provider.name.as_str(), config.model_reasoning_effort, config.model_reasoning_summary, @@ -658,7 +663,7 @@ impl Session { user_shell: Arc::new(default_shell), show_raw_agent_reasoning: config.show_raw_agent_reasoning, auth_manager: Arc::clone(&auth_manager), - otel_event_manager, + otel_manager, models_manager: Arc::clone(&models_manager), tool_approvals: Mutex::new(ApprovalStore::default()), skills: skills.clone(), @@ -787,15 +792,15 @@ impl Session { "resuming session with different model: previous={prev}, current={curr}" ); self.send_event( - &turn_context, - EventMsg::Warning(WarningEvent { - message: format!( - "This session was recorded with model `{prev}` but is resuming with `{curr}`. \ + &turn_context, + EventMsg::Warning(WarningEvent { + message: format!( + "This session was recorded with model `{prev}` but is resuming with `{curr}`. \ Consider switching back to `{prev}` as it may affect Codex performance." - ), - }), - ) - .await; + ), + }), + ) + .await; } } @@ -868,7 +873,7 @@ impl Session { .await; let mut turn_context: TurnContext = Self::make_turn_context( Some(Arc::clone(&self.services.auth_manager)), - &self.services.otel_event_manager, + &self.services.otel_manager, session_configuration.provider.clone(), &session_configuration, per_turn_config, @@ -1694,7 +1699,7 @@ mod handlers { let current_context = sess.new_turn_with_sub_id(sub_id, updates).await; current_context .client - .get_otel_event_manager() + .get_otel_manager() .user_prompt(&items); // Attempt to inject input into current task @@ -2003,20 +2008,17 @@ async fn spawn_review_thread( per_turn_config.model_reasoning_summary = ReasoningSummaryConfig::Detailed; per_turn_config.features = review_features.clone(); - let otel_event_manager = parent_turn_context - .client - .get_otel_event_manager() - .with_model( - config.review_model.as_str(), - review_model_family.slug.as_str(), - ); + let otel_manager = parent_turn_context.client.get_otel_manager().with_model( + config.review_model.as_str(), + review_model_family.slug.as_str(), + ); let per_turn_config = Arc::new(per_turn_config); let client = ModelClient::new( per_turn_config.clone(), auth_manager, model_family.clone(), - otel_event_manager, + otel_manager, provider, per_turn_config.model_reasoning_effort, per_turn_config.model_reasoning_summary, @@ -2238,6 +2240,14 @@ pub(crate) async fn run_task( last_agent_message } +#[instrument( + skip_all, + fields( + turn_id = %turn_context.sub_id, + model = %turn_context.client.get_model(), + cwd = %turn_context.cwd.display() + ) +)] async fn run_turn( sess: Arc, turn_context: Arc, @@ -2370,6 +2380,13 @@ async fn drain_in_flight( } #[allow(clippy::too_many_arguments)] +#[instrument( + skip_all, + fields( + turn_id = %turn_context.sub_id, + model = %turn_context.client.get_model() + ) +)] async fn try_run_turn( router: Arc, sess: Arc, @@ -2392,6 +2409,7 @@ async fn try_run_turn( .client .clone() .stream(prompt) + .instrument(info_span!("stream_request")) .or_cancel(&cancellation_token) .await??; @@ -2406,8 +2424,22 @@ async fn try_run_turn( let mut needs_follow_up = false; let mut last_agent_message: Option = None; let mut active_item: Option = None; + let receiving_span = info_span!("receiving_stream"); let outcome: CodexResult = loop { - let event = match stream.next().or_cancel(&cancellation_token).await { + let handle_responses = info_span!( + parent: &receiving_span, + "handle_responses", + otel.name = field::Empty, + tool_name = field::Empty, + from = field::Empty, + ); + + let event = match stream + .next() + .instrument(info_span!(parent: &handle_responses, "receiving")) + .or_cancel(&cancellation_token) + .await + { Ok(event) => event, Err(codex_async_utils::CancelErr::Cancelled) => break Err(CodexErr::TurnAborted), }; @@ -2422,6 +2454,10 @@ async fn try_run_turn( } }; + sess.services + .otel_manager + .record_responses(&handle_responses, &event); + match event { ResponseEvent::Created => {} ResponseEvent::OutputItemDone(item) => { @@ -2433,8 +2469,9 @@ async fn try_run_turn( cancellation_token: cancellation_token.child_token(), }; - let output_result = - handle_output_item_done(&mut ctx, item, previously_active_item).await?; + let output_result = handle_output_item_done(&mut ctx, item, previously_active_item) + .instrument(handle_responses) + .await?; if let Some(tool_future) = output_result.tool_future { in_flight.push_back(tool_future); } @@ -2913,12 +2950,13 @@ mod tests { }) } - fn otel_event_manager( + fn otel_manager( conversation_id: ConversationId, config: &Config, model_family: &ModelFamily, - ) -> OtelEventManager { - OtelEventManager::new( + session_source: SessionSource, + ) -> OtelManager { + OtelManager::new( conversation_id, ModelsManager::get_model_offline(config.model.as_deref()).as_str(), model_family.slug.as_str(), @@ -2927,6 +2965,7 @@ mod tests { Some(AuthMode::ChatGPT), false, "test".to_string(), + session_source, ) } @@ -2966,8 +3005,12 @@ mod tests { session_configuration.model.as_str(), &per_turn_config, ); - let otel_event_manager = - otel_event_manager(conversation_id, config.as_ref(), &model_family); + let otel_manager = otel_manager( + conversation_id, + config.as_ref(), + &model_family, + session_configuration.session_source.clone(), + ); let state = SessionState::new(session_configuration.clone()); @@ -2980,7 +3023,7 @@ mod tests { user_shell: Arc::new(default_user_shell()), show_raw_agent_reasoning: config.show_raw_agent_reasoning, auth_manager: auth_manager.clone(), - otel_event_manager: otel_event_manager.clone(), + otel_manager: otel_manager.clone(), models_manager, tool_approvals: Mutex::new(ApprovalStore::default()), skills: None, @@ -2988,7 +3031,7 @@ mod tests { let turn_context = Session::make_turn_context( Some(Arc::clone(&auth_manager)), - &otel_event_manager, + &otel_manager, session_configuration.provider.clone(), &session_configuration, per_turn_config, @@ -3052,8 +3095,12 @@ mod tests { session_configuration.model.as_str(), &per_turn_config, ); - let otel_event_manager = - otel_event_manager(conversation_id, config.as_ref(), &model_family); + let otel_manager = otel_manager( + conversation_id, + config.as_ref(), + &model_family, + session_configuration.session_source.clone(), + ); let state = SessionState::new(session_configuration.clone()); @@ -3066,7 +3113,7 @@ mod tests { user_shell: Arc::new(default_user_shell()), show_raw_agent_reasoning: config.show_raw_agent_reasoning, auth_manager: Arc::clone(&auth_manager), - otel_event_manager: otel_event_manager.clone(), + otel_manager: otel_manager.clone(), models_manager, tool_approvals: Mutex::new(ApprovalStore::default()), skills: None, @@ -3074,7 +3121,7 @@ mod tests { let turn_context = Arc::new(Session::make_turn_context( Some(Arc::clone(&auth_manager)), - &otel_event_manager, + &otel_manager, session_configuration.provider.clone(), &session_configuration, per_turn_config, diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index df46a5fdd55..eb1839b1ce5 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -1200,10 +1200,12 @@ impl Config { .environment .unwrap_or(DEFAULT_OTEL_ENVIRONMENT.to_string()); let exporter = t.exporter.unwrap_or(OtelExporterKind::None); + let trace_exporter = t.trace_exporter.unwrap_or_else(|| exporter.clone()); OtelConfig { log_user_prompt, environment, exporter, + trace_exporter, } }, }; diff --git a/codex-rs/core/src/config/types.rs b/codex-rs/core/src/config/types.rs index b30dfb0a383..9243e9878aa 100644 --- a/codex-rs/core/src/config/types.rs +++ b/codex-rs/core/src/config/types.rs @@ -323,8 +323,11 @@ pub struct OtelConfigToml { /// Mark traces with environment (dev, staging, prod, test). Defaults to dev. pub environment: Option, - /// Exporter to use. Defaults to `otlp-file`. + /// Optional log exporter pub exporter: Option, + + /// Optional trace exporter + pub trace_exporter: Option, } /// Effective OTEL settings after defaults are applied. @@ -333,6 +336,7 @@ pub struct OtelConfig { pub log_user_prompt: bool, pub environment: String, pub exporter: OtelExporterKind, + pub trace_exporter: OtelExporterKind, } impl Default for OtelConfig { @@ -341,6 +345,7 @@ impl Default for OtelConfig { log_user_prompt: false, environment: DEFAULT_OTEL_ENVIRONMENT.to_owned(), exporter: OtelExporterKind::None, + trace_exporter: OtelExporterKind::None, } } } diff --git a/codex-rs/core/src/default_client.rs b/codex-rs/core/src/default_client.rs index 7ae2f8c35ac..2ea512b935e 100644 --- a/codex-rs/core/src/default_client.rs +++ b/codex-rs/core/src/default_client.rs @@ -1,12 +1,11 @@ use crate::spawn::CODEX_SANDBOX_ENV_VAR; +use codex_client::CodexHttpClient; +pub use codex_client::CodexRequestBuilder; use reqwest::header::HeaderValue; use std::sync::LazyLock; use std::sync::Mutex; use std::sync::OnceLock; -use codex_client::CodexHttpClient; -pub use codex_client::CodexRequestBuilder; - /// Set this to add a suffix to the User-Agent string. /// /// It is not ideal that we're using a global singleton for this. diff --git a/codex-rs/core/src/mcp_connection_manager.rs b/codex-rs/core/src/mcp_connection_manager.rs index 4eddb1cd80a..460d598a945 100644 --- a/codex-rs/core/src/mcp_connection_manager.rs +++ b/codex-rs/core/src/mcp_connection_manager.rs @@ -58,6 +58,7 @@ use tokio::sync::Mutex; use tokio::sync::oneshot; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; +use tracing::instrument; use tracing::warn; use crate::codex::INITIAL_SUBMIT_ID; @@ -397,6 +398,7 @@ impl McpConnectionManager { /// Returns a single map that contains all tools. Each key is the /// fully-qualified name for the tool. + #[instrument(skip_all)] pub async fn list_all_tools(&self) -> HashMap { let mut tools = HashMap::new(); for managed_client in self.clients.values() { diff --git a/codex-rs/core/src/otel_init.rs b/codex-rs/core/src/otel_init.rs index 5900c9b4a67..ece5a6bf500 100644 --- a/codex-rs/core/src/otel_init.rs +++ b/codex-rs/core/src/otel_init.rs @@ -16,7 +16,7 @@ pub fn build_provider( config: &Config, service_version: &str, ) -> Result, Box> { - let exporter = match &config.otel.exporter { + let to_otel_exporter = |kind: &Kind| match kind { Kind::None => OtelExporter::None, Kind::OtlpHttp { endpoint, @@ -61,12 +61,16 @@ pub fn build_provider( }, }; + let exporter = to_otel_exporter(&config.otel.exporter); + let trace_exporter = to_otel_exporter(&config.otel.trace_exporter); + OtelProvider::from(&OtelSettings { service_name: originator().value.to_owned(), service_version: service_version.to_string(), codex_home: config.codex_home.clone(), environment: config.otel.environment.to_string(), exporter, + trace_exporter, }) } diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 0270f3411c8..dbc61502927 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -8,7 +8,7 @@ use crate::skills::SkillLoadOutcome; use crate::tools::sandboxing::ApprovalStore; use crate::unified_exec::UnifiedExecSessionManager; use crate::user_notification::UserNotifier; -use codex_otel::otel_event_manager::OtelEventManager; +use codex_otel::otel_manager::OtelManager; use tokio::sync::Mutex; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; @@ -23,7 +23,7 @@ pub(crate) struct SessionServices { pub(crate) show_raw_agent_reasoning: bool, pub(crate) auth_manager: Arc, pub(crate) models_manager: Arc, - pub(crate) otel_event_manager: OtelEventManager, + pub(crate) otel_manager: OtelManager, pub(crate) tool_approvals: Mutex, pub(crate) skills: Option, } diff --git a/codex-rs/core/src/stream_events_utils.rs b/codex-rs/core/src/stream_events_utils.rs index 1cb74bc250f..e1c9a652525 100644 --- a/codex-rs/core/src/stream_events_utils.rs +++ b/codex-rs/core/src/stream_events_utils.rs @@ -16,7 +16,9 @@ use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use futures::Future; +use tracing::Instrument; use tracing::debug; +use tracing::instrument; /// Handle a completed output item from the model stream, recording it and /// queuing any tool execution futures. This records items immediately so @@ -38,6 +40,7 @@ pub(crate) struct HandleOutputCtx { pub cancellation_token: CancellationToken, } +#[instrument(skip_all)] pub(crate) async fn handle_output_item_done( ctx: &mut HandleOutputCtx, item: ResponseItem, @@ -58,12 +61,15 @@ pub(crate) async fn handle_output_item_done( let cancellation_token = ctx.cancellation_token.child_token(); let tool_runtime = ctx.tool_runtime.clone(); - let tool_future: InFlightFuture<'static> = Box::pin(async move { - let response_input = tool_runtime - .handle_tool_call(call, cancellation_token) - .await?; - Ok(response_input) - }); + let tool_future: InFlightFuture<'static> = Box::pin( + async move { + let response_input = tool_runtime + .handle_tool_call(call, cancellation_token) + .await?; + Ok(response_input) + } + .in_current_span(), + ); output.needs_follow_up = true; output.tool_future = Some(tool_future); @@ -94,7 +100,7 @@ pub(crate) async fn handle_output_item_done( let msg = "LocalShellCall without call_id or id"; ctx.turn_context .client - .get_otel_event_manager() + .get_otel_manager() .log_tool_failed("local_shell", msg); tracing::error!(msg); diff --git a/codex-rs/core/src/tasks/regular.rs b/codex-rs/core/src/tasks/regular.rs index 416dba3f7fa..2ee598f21c0 100644 --- a/codex-rs/core/src/tasks/regular.rs +++ b/codex-rs/core/src/tasks/regular.rs @@ -1,12 +1,13 @@ use std::sync::Arc; -use async_trait::async_trait; -use tokio_util::sync::CancellationToken; - use crate::codex::TurnContext; use crate::codex::run_task; use crate::state::TaskKind; +use async_trait::async_trait; use codex_protocol::user_input::UserInput; +use tokio_util::sync::CancellationToken; +use tracing::Instrument; +use tracing::info_span; use super::SessionTask; use super::SessionTaskContext; @@ -28,6 +29,10 @@ impl SessionTask for RegularTask { cancellation_token: CancellationToken, ) -> Option { let sess = session.clone_session(); - run_task(sess, ctx, input, cancellation_token).await + let run_task_span = + info_span!(parent: sess.services.otel_manager.current_span(), "run_task"); + run_task(sess, ctx, input, cancellation_token) + .instrument(run_task_span) + .await } } diff --git a/codex-rs/core/src/tools/context.rs b/codex-rs/core/src/tools/context.rs index 18b0d11eead..b226fb11e3a 100644 --- a/codex-rs/core/src/tools/context.rs +++ b/codex-rs/core/src/tools/context.rs @@ -26,7 +26,7 @@ pub struct ToolInvocation { pub payload: ToolPayload, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum ToolPayload { Function { arguments: String, diff --git a/codex-rs/core/src/tools/orchestrator.rs b/codex-rs/core/src/tools/orchestrator.rs index 003c727610b..7853617238e 100644 --- a/codex-rs/core/src/tools/orchestrator.rs +++ b/codex-rs/core/src/tools/orchestrator.rs @@ -42,11 +42,11 @@ impl ToolOrchestrator { where T: ToolRuntime, { - let otel = turn_ctx.client.get_otel_event_manager(); + let otel = turn_ctx.client.get_otel_manager(); let otel_tn = &tool_ctx.tool_name; let otel_ci = &tool_ctx.call_id; - let otel_user = codex_otel::otel_event_manager::ToolDecisionSource::User; - let otel_cfg = codex_otel::otel_event_manager::ToolDecisionSource::Config; + let otel_user = codex_otel::otel_manager::ToolDecisionSource::User; + let otel_cfg = codex_otel::otel_manager::ToolDecisionSource::Config; // 1) Approval let mut already_approved = false; diff --git a/codex-rs/core/src/tools/parallel.rs b/codex-rs/core/src/tools/parallel.rs index 971ea934d8a..feef518e57f 100644 --- a/codex-rs/core/src/tools/parallel.rs +++ b/codex-rs/core/src/tools/parallel.rs @@ -5,6 +5,9 @@ use tokio::sync::RwLock; use tokio_util::either::Either; use tokio_util::sync::CancellationToken; use tokio_util::task::AbortOnDropHandle; +use tracing::Instrument; +use tracing::info_span; +use tracing::instrument; use crate::codex::Session; use crate::codex::TurnContext; @@ -42,6 +45,7 @@ impl ToolCallRuntime { } } + #[instrument(skip_all, fields(call = ?call))] pub(crate) fn handle_tool_call( &self, call: ToolCall, @@ -56,11 +60,20 @@ impl ToolCallRuntime { let lock = Arc::clone(&self.parallel_execution); let started = Instant::now(); + let dispatch_span = info_span!( + "dispatch_tool_call", + otel.name = call.tool_name.as_str(), + tool_name = call.tool_name.as_str(), + call_id = call.call_id.as_str(), + aborted = false, + ); + let handle: AbortOnDropHandle> = AbortOnDropHandle::new(tokio::spawn(async move { tokio::select! { _ = cancellation_token.cancelled() => { let secs = started.elapsed().as_secs_f32().max(0.1); + dispatch_span.record("aborted", true); Ok(Self::aborted_response(&call, secs)) }, res = async { @@ -72,6 +85,7 @@ impl ToolCallRuntime { router .dispatch_tool_call(session, turn, tracker, call.clone()) + .instrument(dispatch_span.clone()) .await } => res, } @@ -87,6 +101,7 @@ impl ToolCallRuntime { ))), } } + .in_current_span() } } diff --git a/codex-rs/core/src/tools/registry.rs b/codex-rs/core/src/tools/registry.rs index 9b33e84b76b..aa54421770b 100644 --- a/codex-rs/core/src/tools/registry.rs +++ b/codex-rs/core/src/tools/registry.rs @@ -64,7 +64,7 @@ impl ToolRegistry { ) -> Result { let tool_name = invocation.tool_name.clone(); let call_id_owned = invocation.call_id.clone(); - let otel = invocation.turn.client.get_otel_event_manager(); + let otel = invocation.turn.client.get_otel_manager(); let payload_for_response = invocation.payload.clone(); let log_payload = payload_for_response.log_payload(); diff --git a/codex-rs/core/src/tools/router.rs b/codex-rs/core/src/tools/router.rs index b6675bcd5d1..66e6026b3a7 100644 --- a/codex-rs/core/src/tools/router.rs +++ b/codex-rs/core/src/tools/router.rs @@ -1,6 +1,3 @@ -use std::collections::HashMap; -use std::sync::Arc; - use crate::client_common::tools::ToolSpec; use crate::codex::Session; use crate::codex::TurnContext; @@ -17,8 +14,11 @@ use codex_protocol::models::LocalShellAction; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::models::ShellToolCallParams; +use std::collections::HashMap; +use std::sync::Arc; +use tracing::instrument; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ToolCall { pub tool_name: String, pub call_id: String, @@ -55,6 +55,7 @@ impl ToolRouter { .any(|config| config.spec.name() == tool_name) } + #[instrument(skip_all, err)] pub async fn build_tool_call( session: &Session, item: ResponseItem, @@ -130,6 +131,7 @@ impl ToolRouter { } } + #[instrument(skip_all, err)] pub async fn dispatch_tool_call( &self, session: Arc, diff --git a/codex-rs/core/tests/chat_completions_payload.rs b/codex-rs/core/tests/chat_completions_payload.rs index 6bfad437833..3e53fa85cf9 100644 --- a/codex-rs/core/tests/chat_completions_payload.rs +++ b/codex-rs/core/tests/chat_completions_payload.rs @@ -13,9 +13,10 @@ use codex_core::Prompt; use codex_core::ResponseItem; use codex_core::WireApi; use codex_core::openai_models::models_manager::ModelsManager; -use codex_otel::otel_event_manager::OtelEventManager; +use codex_otel::otel_manager::OtelManager; use codex_protocol::ConversationId; use codex_protocol::models::ReasoningItemContent; +use codex_protocol::protocol::SessionSource; use core_test_support::load_default_config_for_test; use core_test_support::skip_if_no_network; use futures::StreamExt; @@ -75,7 +76,7 @@ async fn run_request(input: Vec) -> Value { let conversation_id = ConversationId::new(); let model = ModelsManager::get_model_offline(config.model.as_deref()); let model_family = ModelsManager::construct_model_family_offline(model.as_str(), &config); - let otel_event_manager = OtelEventManager::new( + let otel_manager = OtelManager::new( conversation_id, model.as_str(), model_family.slug.as_str(), @@ -84,18 +85,19 @@ async fn run_request(input: Vec) -> Value { Some(AuthMode::ApiKey), false, "test".to_string(), + SessionSource::Exec, ); let client = ModelClient::new( Arc::clone(&config), None, model_family, - otel_event_manager, + otel_manager, provider, effort, summary, conversation_id, - codex_protocol::protocol::SessionSource::Exec, + SessionSource::Exec, ); let mut prompt = Prompt::default(); diff --git a/codex-rs/core/tests/chat_completions_sse.rs b/codex-rs/core/tests/chat_completions_sse.rs index 9124d59d13c..969fa47b86c 100644 --- a/codex-rs/core/tests/chat_completions_sse.rs +++ b/codex-rs/core/tests/chat_completions_sse.rs @@ -12,9 +12,10 @@ use codex_core::ResponseEvent; use codex_core::ResponseItem; use codex_core::WireApi; use codex_core::openai_models::models_manager::ModelsManager; -use codex_otel::otel_event_manager::OtelEventManager; +use codex_otel::otel_manager::OtelManager; use codex_protocol::ConversationId; use codex_protocol::models::ReasoningItemContent; +use codex_protocol::protocol::SessionSource; use core_test_support::load_default_config_for_test; use core_test_support::skip_if_no_network; use futures::StreamExt; @@ -76,7 +77,7 @@ async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec { let auth_mode = auth_manager.get_auth_mode(); let model = ModelsManager::get_model_offline(config.model.as_deref()); let model_family = ModelsManager::construct_model_family_offline(model.as_str(), &config); - let otel_event_manager = OtelEventManager::new( + let otel_manager = OtelManager::new( conversation_id, model.as_str(), model_family.slug.as_str(), @@ -85,18 +86,19 @@ async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec { auth_mode, false, "test".to_string(), + SessionSource::Exec, ); let client = ModelClient::new( Arc::clone(&config), None, model_family, - otel_event_manager, + otel_manager, provider, effort, summary, conversation_id, - codex_protocol::protocol::SessionSource::Exec, + SessionSource::Exec, ); let mut prompt = Prompt::default(); diff --git a/codex-rs/core/tests/responses_headers.rs b/codex-rs/core/tests/responses_headers.rs index 02880502052..382c8875ce0 100644 --- a/codex-rs/core/tests/responses_headers.rs +++ b/codex-rs/core/tests/responses_headers.rs @@ -11,11 +11,12 @@ use codex_core::ResponseEvent; use codex_core::ResponseItem; use codex_core::WireApi; use codex_core::openai_models::models_manager::ModelsManager; -use codex_otel::otel_event_manager::OtelEventManager; +use codex_otel::otel_manager::OtelManager; use codex_protocol::ConversationId; use codex_protocol::config_types::ReasoningSummary; use codex_protocol::openai_models::ReasoningSummaryFormat; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::SubAgentSource; use core_test_support::load_default_config_for_test; use core_test_support::responses; use futures::StreamExt; @@ -67,8 +68,9 @@ async fn responses_stream_includes_subagent_header_on_review() { let conversation_id = ConversationId::new(); let auth_mode = AuthMode::ChatGPT; + let session_source = SessionSource::SubAgent(SubAgentSource::Review); let model_family = ModelsManager::construct_model_family_offline(model.as_str(), &config); - let otel_event_manager = OtelEventManager::new( + let otel_manager = OtelManager::new( conversation_id, model.as_str(), model_family.slug.as_str(), @@ -77,18 +79,19 @@ async fn responses_stream_includes_subagent_header_on_review() { Some(auth_mode), false, "test".to_string(), + session_source.clone(), ); let client = ModelClient::new( Arc::clone(&config), None, model_family, - otel_event_manager, + otel_manager, provider, effort, summary, conversation_id, - SessionSource::SubAgent(codex_protocol::protocol::SubAgentSource::Review), + session_source, ); let mut prompt = Prompt::default(); @@ -159,9 +162,10 @@ async fn responses_stream_includes_subagent_header_on_other() { let conversation_id = ConversationId::new(); let auth_mode = AuthMode::ChatGPT; + let session_source = SessionSource::SubAgent(SubAgentSource::Other("my-task".to_string())); let model_family = ModelsManager::construct_model_family_offline(model.as_str(), &config); - let otel_event_manager = OtelEventManager::new( + let otel_manager = OtelManager::new( conversation_id, model.as_str(), model_family.slug.as_str(), @@ -170,20 +174,19 @@ async fn responses_stream_includes_subagent_header_on_other() { Some(auth_mode), false, "test".to_string(), + session_source.clone(), ); let client = ModelClient::new( Arc::clone(&config), None, model_family, - otel_event_manager, + otel_manager, provider, effort, summary, conversation_id, - SessionSource::SubAgent(codex_protocol::protocol::SubAgentSource::Other( - "my-task".to_string(), - )), + session_source, ); let mut prompt = Prompt::default(); @@ -253,8 +256,10 @@ async fn responses_respects_model_family_overrides_from_config() { let conversation_id = ConversationId::new(); let auth_mode = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")).get_auth_mode(); + let session_source = + SessionSource::SubAgent(SubAgentSource::Other("override-check".to_string())); let model_family = ModelsManager::construct_model_family_offline(model.as_str(), &config); - let otel_event_manager = OtelEventManager::new( + let otel_manager = OtelManager::new( conversation_id, model.as_str(), model_family.slug.as_str(), @@ -263,20 +268,19 @@ async fn responses_respects_model_family_overrides_from_config() { auth_mode, false, "test".to_string(), + session_source.clone(), ); let client = ModelClient::new( Arc::clone(&config), None, model_family, - otel_event_manager, + otel_manager, provider, effort, summary, conversation_id, - SessionSource::SubAgent(codex_protocol::protocol::SubAgentSource::Other( - "override-check".to_string(), - )), + session_source, ); let mut prompt = Prompt::default(); diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index 4d47c9f214f..cdde1616f28 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -20,7 +20,7 @@ use codex_core::openai_models::models_manager::ModelsManager; use codex_core::protocol::EventMsg; use codex_core::protocol::Op; use codex_core::protocol::SessionSource; -use codex_otel::otel_event_manager::OtelEventManager; +use codex_otel::otel_manager::OtelManager; use codex_protocol::ConversationId; use codex_protocol::config_types::ReasoningSummary; use codex_protocol::config_types::Verbosity; @@ -1122,7 +1122,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { let model_family = ModelsManager::construct_model_family_offline(model.as_str(), &config); let conversation_id = ConversationId::new(); let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); - let otel_event_manager = OtelEventManager::new( + let otel_manager = OtelManager::new( conversation_id, model.as_str(), model_family.slug.as_str(), @@ -1131,18 +1131,19 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { auth_manager.get_auth_mode(), false, "test".to_string(), + SessionSource::Exec, ); let client = ModelClient::new( Arc::clone(&config), None, model_family, - otel_event_manager, + otel_manager, provider, effort, summary, conversation_id, - codex_protocol::protocol::SessionSource::Exec, + SessionSource::Exec, ); let mut prompt = Prompt::default(); diff --git a/codex-rs/core/tests/suite/otel.rs b/codex-rs/core/tests/suite/otel.rs index 8665d3a8ea2..65e96a4fec8 100644 --- a/codex-rs/core/tests/suite/otel.rs +++ b/codex-rs/core/tests/suite/otel.rs @@ -9,15 +9,26 @@ use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_custom_tool_call; use core_test_support::responses::ev_function_call; +use core_test_support::responses::ev_local_shell_call; +use core_test_support::responses::ev_message_item_added; +use core_test_support::responses::ev_output_text_delta; +use core_test_support::responses::ev_reasoning_item; +use core_test_support::responses::ev_reasoning_summary_text_delta; +use core_test_support::responses::ev_reasoning_text_delta; +use core_test_support::responses::ev_response_created; +use core_test_support::responses::mount_response_once; use core_test_support::responses::mount_sse_once; use core_test_support::responses::sse; +use core_test_support::responses::sse_response; use core_test_support::responses::start_mock_server; use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; +use std::sync::Mutex; use tracing_test::traced_test; -use core_test_support::responses::ev_local_shell_call; +use tracing_subscriber::fmt::format::FmtSpan; +use tracing_test::internal::MockWriter; #[tokio::test] #[traced_test] @@ -437,6 +448,152 @@ async fn process_sse_emits_completed_telemetry() { }); } +#[tokio::test] +async fn handle_responses_span_records_response_kind_and_tool_name() { + let buffer: &'static Mutex> = Box::leak(Box::new(Mutex::new(Vec::new()))); + let subscriber = tracing_subscriber::fmt() + .with_level(true) + .with_ansi(false) + .with_span_events(FmtSpan::FULL) + .with_writer(MockWriter::new(buffer)) + .finish(); + let _guard = tracing::subscriber::set_default(subscriber); + + let server = start_mock_server().await; + + mount_sse_once( + &server, + sse(vec![ + ev_function_call("function-call", "nonexistent", "{\"value\":1}"), + ev_completed("done"), + ]), + ) + .await; + mount_sse_once( + &server, + sse(vec![ + ev_assistant_message("msg-1", "tool handled"), + ev_completed("done"), + ]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(|config| { + config.features.disable(Feature::GhostCommit); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + let logs = String::from_utf8(buffer.lock().unwrap().clone()).unwrap(); + + assert!( + logs.contains("handle_responses{otel.name=\"function_call\"") + && logs.contains("tool_name=\"nonexistent\"") + && logs.contains("from=\"output_item_done\""), + "missing handle_responses span with function call metadata\nlogs:\n{logs}" + ); + assert!( + logs.contains("handle_responses{otel.name=\"completed\""), + "missing handle_responses span for completion\nlogs:\n{logs}" + ); +} + +#[tokio::test(flavor = "current_thread")] +async fn record_responses_sets_span_fields_for_response_events() { + let buffer: &'static Mutex> = Box::leak(Box::new(Mutex::new(Vec::new()))); + let subscriber = tracing_subscriber::fmt() + .with_level(true) + .with_ansi(false) + .with_span_events(FmtSpan::FULL) + .with_writer(MockWriter::new(buffer)) + .finish(); + let _guard = tracing::subscriber::set_default(subscriber); + + let server = start_mock_server().await; + + let sse_body = sse(vec![ + ev_response_created("resp-1"), + ev_function_call("call-1", "fn", "{\"value\":1}"), + ev_custom_tool_call("custom-1", "custom_tool", "{\"key\":\"value\"}"), + ev_message_item_added("msg-added", "hi there"), + ev_output_text_delta("delta"), + ev_reasoning_summary_text_delta("summary-delta"), + ev_reasoning_text_delta("raw-delta"), + ev_function_call("call-1", "fn", "{\"key\":\"value\"}"), + ev_custom_tool_call("custom-1", "custom_tool", "{\"key\":\"value\"}"), + ev_assistant_message("msg-1", "agent"), + ev_reasoning_item("reasoning-1", &["summary"], &[]), + ev_completed("resp-1"), + ]); + + mount_response_once(&server, sse_response(sse_body)).await; + + let TestCodex { codex, .. } = test_codex() + .with_config(|config| { + config.features.disable(Feature::GhostCommit); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + let logs = String::from_utf8(buffer.lock().unwrap().clone()).unwrap(); + + let expected = [ + ("created", None::<&str>, None::<&str>), + ("rate_limits", None, None), + ("function_call", Some("output_item_added"), Some("fn")), + ("message_from_assistant", Some("output_item_done"), None), + ("reasoning", Some("output_item_done"), None), + ("text_delta", None, None), + ("reasoning_summary_delta", None, None), + ("reasoning_content_delta", None, None), + ("completed", None, None), + ]; + + for (name, from, tool_name) in expected { + assert!( + logs.contains(&format!("handle_responses{{otel.name=\"{name}\"")), + "missing otel.name={name}\nlogs:\n{logs}" + ); + if let Some(from) = from { + assert!( + logs.contains(&format!("from=\"{from}\"")), + "missing from={from} for {name}\nlogs:\n{logs}" + ); + } + if let Some(tool_name) = tool_name { + assert!( + logs.contains(&format!("tool_name=\"{tool_name}\"")), + "missing tool_name={tool_name} for {name}\nlogs:\n{logs}" + ); + } + } +} + #[tokio::test] #[traced_test] async fn handle_response_item_records_tool_result_for_custom_tool_call() { diff --git a/codex-rs/exec/Cargo.toml b/codex-rs/exec/Cargo.toml index a6e5302de12..c2052154217 100644 --- a/codex-rs/exec/Cargo.toml +++ b/codex-rs/exec/Cargo.toml @@ -28,7 +28,6 @@ codex-core = { workspace = true } codex-protocol = { workspace = true } codex-utils-absolute-path = { workspace = true } mcp-types = { workspace = true } -opentelemetry-appender-tracing = { workspace = true } owo-colors = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 7d7d4c301fb..ee0ae45d4e0 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -39,7 +39,6 @@ use codex_protocol::config_types::SandboxMode; use codex_protocol::user_input::UserInput; use event_processor_with_human_output::EventProcessorWithHumanOutput; use event_processor_with_jsonl_output::EventProcessorWithJsonOutput; -use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use serde_json::Value; use std::io::IsTerminal; use std::io::Read; @@ -221,18 +220,15 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any } }; - if let Some(provider) = otel.as_ref() { - let otel_layer = OpenTelemetryTracingBridge::new(&provider.logger).with_filter( - tracing_subscriber::filter::filter_fn(codex_core::otel_init::codex_export_filter), - ); + let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer()); - let _ = tracing_subscriber::registry() - .with(fmt_layer) - .with(otel_layer) - .try_init(); - } else { - let _ = tracing_subscriber::registry().with(fmt_layer).try_init(); - } + let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer()); + + let _ = tracing_subscriber::registry() + .with(fmt_layer) + .with(otel_tracing_layer) + .with(otel_logger_layer) + .try_init(); let mut event_processor: Box = match json_mode { true => Box::new(EventProcessorWithJsonOutput::new(last_message_file.clone())), diff --git a/codex-rs/otel/Cargo.toml b/codex-rs/otel/Cargo.toml index af8b72346d6..8c99326a4c7 100644 --- a/codex-rs/otel/Cargo.toml +++ b/codex-rs/otel/Cargo.toml @@ -12,43 +12,46 @@ path = "src/lib.rs" [lints] workspace = true -[features] -# Compile-time gate for OTLP support; disabled by default. -# Downstream crates can enable via `features = ["otel"]`. -default = [] -otel = ["opentelemetry", "opentelemetry_sdk", "opentelemetry-otlp", "tonic"] - [dependencies] chrono = { workspace = true } codex-app-server-protocol = { workspace = true } codex-utils-absolute-path = { workspace = true } +codex-api = { workspace = true } codex-protocol = { workspace = true } eventsource-stream = { workspace = true } -opentelemetry = { workspace = true, features = ["logs"], optional = true } +opentelemetry = { workspace = true, features = ["logs", "trace"] } +opentelemetry-appender-tracing = { workspace = true } opentelemetry-otlp = { workspace = true, features = [ "grpc-tonic", "http-proto", "http-json", "logs", + "trace", "reqwest-blocking-client", "reqwest-rustls", "tls", "tls-roots", -], optional = true } +]} opentelemetry-semantic-conventions = { workspace = true } opentelemetry_sdk = { workspace = true, features = [ "logs", "rt-tokio", -], optional = true } + "trace", +]} http = { workspace = true } reqwest = { workspace = true, features = ["blocking", "rustls-tls"] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } strum_macros = { workspace = true } tokio = { workspace = true } -tonic = { workspace = true, optional = true, features = [ +tonic = { workspace = true, features = [ "transport", "tls-native-roots", "tls-ring", ] } tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } +tracing-subscriber = { workspace = true } + +[dev-dependencies] +opentelemetry_sdk = { workspace = true, features = ["testing"] } diff --git a/codex-rs/otel/src/config.rs b/codex-rs/otel/src/config.rs index 652a1c97b2b..935c0379fbe 100644 --- a/codex-rs/otel/src/config.rs +++ b/codex-rs/otel/src/config.rs @@ -10,6 +10,7 @@ pub struct OtelSettings { pub service_version: String, pub codex_home: PathBuf, pub exporter: OtelExporter, + pub trace_exporter: OtelExporter, } #[derive(Clone, Debug)] diff --git a/codex-rs/otel/src/lib.rs b/codex-rs/otel/src/lib.rs index 3fdb6f7413a..5211c8e89ba 100644 --- a/codex-rs/otel/src/lib.rs +++ b/codex-rs/otel/src/lib.rs @@ -1,26 +1,4 @@ pub mod config; -pub mod otel_event_manager; -#[cfg(feature = "otel")] +pub mod otel_manager; pub mod otel_provider; - -#[cfg(not(feature = "otel"))] -mod imp { - use reqwest::header::HeaderMap; - use tracing::Span; - - pub struct OtelProvider; - - impl OtelProvider { - pub fn from(_settings: &crate::config::OtelSettings) -> Option { - None - } - - pub fn headers(_span: &Span) -> HeaderMap { - HeaderMap::new() - } - } -} - -#[cfg(not(feature = "otel"))] -pub use imp::OtelProvider; diff --git a/codex-rs/otel/src/otel_event_manager.rs b/codex-rs/otel/src/otel_manager.rs similarity index 82% rename from codex-rs/otel/src/otel_event_manager.rs rename to codex-rs/otel/src/otel_manager.rs index 54e3fe3dc18..268897f8ae7 100644 --- a/codex-rs/otel/src/otel_event_manager.rs +++ b/codex-rs/otel/src/otel_manager.rs @@ -1,5 +1,7 @@ +use crate::otel_provider::traceparent_context_from_env; use chrono::SecondsFormat; use chrono::Utc; +use codex_api::ResponseEvent; use codex_app_server_protocol::AuthMode; use codex_protocol::ConversationId; use codex_protocol::config_types::ReasoningSummary; @@ -8,6 +10,7 @@ use codex_protocol::openai_models::ReasoningEffort; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::ReviewDecision; use codex_protocol::protocol::SandboxPolicy; +use codex_protocol::protocol::SessionSource; use codex_protocol::user_input::UserInput; use eventsource_stream::Event as StreamEvent; use eventsource_stream::EventStreamError as StreamError; @@ -16,10 +19,14 @@ use reqwest::Response; use serde::Serialize; use std::borrow::Cow; use std::fmt::Display; +use std::future::Future; use std::time::Duration; use std::time::Instant; use strum_macros::Display; use tokio::time::error::Elapsed; +use tracing::Span; +use tracing::info_span; +use tracing_opentelemetry::OpenTelemetrySpanExt; #[derive(Debug, Clone, Serialize, Display)] #[serde(rename_all = "snake_case")] @@ -42,11 +49,12 @@ pub struct OtelEventMetadata { } #[derive(Debug, Clone)] -pub struct OtelEventManager { +pub struct OtelManager { metadata: OtelEventMetadata, + session_span: Span, } -impl OtelEventManager { +impl OtelManager { #[allow(clippy::too_many_arguments)] pub fn new( conversation_id: ConversationId, @@ -57,7 +65,14 @@ impl OtelEventManager { auth_mode: Option, log_user_prompts: bool, terminal_type: String, - ) -> OtelEventManager { + session_source: SessionSource, + ) -> OtelManager { + let session_span = info_span!("new_session", conversation_id = %conversation_id, session_source = %session_source); + + if let Some(context) = traceparent_context_from_env() { + session_span.set_parent(context); + } + Self { metadata: OtelEventMetadata { conversation_id, @@ -70,6 +85,7 @@ impl OtelEventManager { app_version: env!("CARGO_PKG_VERSION"), terminal_type, }, + session_span, } } @@ -80,6 +96,30 @@ impl OtelEventManager { manager } + pub fn current_span(&self) -> &Span { + &self.session_span + } + + pub fn record_responses(&self, handle_responses_span: &Span, event: &ResponseEvent) { + handle_responses_span.record("otel.name", OtelManager::responses_type(event)); + + match event { + ResponseEvent::OutputItemDone(item) => { + handle_responses_span.record("from", "output_item_done"); + if let ResponseItem::FunctionCall { name, .. } = &item { + handle_responses_span.record("tool_name", name.as_str()); + } + } + ResponseEvent::OutputItemAdded(item) => { + handle_responses_span.record("from", "output_item_added"); + if let ResponseItem::FunctionCall { name, .. } = &item { + handle_responses_span.record("tool_name", name.as_str()); + } + } + _ => {} + } + } + #[allow(clippy::too_many_arguments)] pub fn conversation_starts( &self, @@ -394,27 +434,13 @@ impl OtelEventManager { Err(error) => (Cow::Owned(error.to_string()), false), }; - let success_str = if success { "true" } else { "false" }; - - tracing::event!( - tracing::Level::INFO, - event.name = "codex.tool_result", - event.timestamp = %timestamp(), - conversation.id = %self.metadata.conversation_id, - app.version = %self.metadata.app_version, - auth_mode = self.metadata.auth_mode, - user.account_id= self.metadata.account_id, - user.email = self.metadata.account_email, - terminal.type = %self.metadata.terminal_type, - model = %self.metadata.model, - slug = %self.metadata.slug, - tool_name = %tool_name, - call_id = %call_id, - arguments = %arguments, - duration_ms = %duration.as_millis(), - success = %success_str, - // `output` is truncated by the tool layer before reaching telemetry. - output = %output, + self.tool_result( + tool_name, + call_id, + arguments, + duration, + success, + output.as_ref(), ); result @@ -471,6 +497,38 @@ impl OtelEventManager { output = %output, ); } + + fn responses_type(event: &ResponseEvent) -> String { + match event { + ResponseEvent::Created => "created".into(), + ResponseEvent::OutputItemDone(item) => OtelManager::responses_item_type(item), + ResponseEvent::OutputItemAdded(item) => OtelManager::responses_item_type(item), + ResponseEvent::Completed { .. } => "completed".into(), + ResponseEvent::OutputTextDelta(_) => "text_delta".into(), + ResponseEvent::ReasoningSummaryDelta { .. } => "reasoning_summary_delta".into(), + ResponseEvent::ReasoningContentDelta { .. } => "reasoning_content_delta".into(), + ResponseEvent::ReasoningSummaryPartAdded { .. } => { + "reasoning_summary_part_added".into() + } + ResponseEvent::RateLimits(_) => "rate_limits".into(), + } + } + + fn responses_item_type(item: &ResponseItem) -> String { + match item { + ResponseItem::Message { role, .. } => format!("message_from_{role}"), + ResponseItem::Reasoning { .. } => "reasoning".into(), + ResponseItem::LocalShellCall { .. } => "local_shell_call".into(), + ResponseItem::FunctionCall { .. } => "function_call".into(), + ResponseItem::FunctionCallOutput { .. } => "function_call_output".into(), + ResponseItem::CustomToolCall { .. } => "custom_tool_call".into(), + ResponseItem::CustomToolCallOutput { .. } => "custom_tool_call_output".into(), + ResponseItem::WebSearchCall { .. } => "web_search_call".into(), + ResponseItem::GhostSnapshot { .. } => "ghost_snapshot".into(), + ResponseItem::Compaction { .. } => "compaction".into(), + ResponseItem::Other => "other".into(), + } + } } fn timestamp() -> String { diff --git a/codex-rs/otel/src/otel_provider.rs b/codex-rs/otel/src/otel_provider.rs index 92b1feaa188..8e2826f834d 100644 --- a/codex-rs/otel/src/otel_provider.rs +++ b/codex-rs/otel/src/otel_provider.rs @@ -4,142 +4,366 @@ use crate::config::OtelSettings; use crate::config::OtelTlsConfig; use codex_utils_absolute_path::AbsolutePathBuf; use http::Uri; +use opentelemetry::Context; use opentelemetry::KeyValue; +use opentelemetry::context::ContextGuard; +use opentelemetry::global; +use opentelemetry::propagation::TextMapPropagator; +use opentelemetry::trace::TraceContextExt; +use opentelemetry::trace::TracerProvider as _; +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use opentelemetry_otlp::LogExporter; use opentelemetry_otlp::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT; use opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT; use opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT; +use opentelemetry_otlp::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT; use opentelemetry_otlp::Protocol; +use opentelemetry_otlp::SpanExporter; use opentelemetry_otlp::WithExportConfig; use opentelemetry_otlp::WithHttpConfig; use opentelemetry_otlp::WithTonicConfig; use opentelemetry_sdk::Resource; use opentelemetry_sdk::logs::SdkLoggerProvider; +use opentelemetry_sdk::propagation::TraceContextPropagator; +use opentelemetry_sdk::trace::BatchSpanProcessor; +use opentelemetry_sdk::trace::SdkTracerProvider; +use opentelemetry_sdk::trace::Tracer; use opentelemetry_semantic_conventions as semconv; use reqwest::Certificate as ReqwestCertificate; use reqwest::Identity as ReqwestIdentity; use reqwest::header::HeaderMap; use reqwest::header::HeaderName; use reqwest::header::HeaderValue; +use std::cell::RefCell; +use std::collections::HashMap; use std::env; use std::error::Error; use std::fs; use std::io::ErrorKind; use std::io::{self}; use std::path::PathBuf; +use std::sync::OnceLock; use std::time::Duration; use tonic::metadata::MetadataMap; use tonic::transport::Certificate as TonicCertificate; use tonic::transport::ClientTlsConfig; use tonic::transport::Identity as TonicIdentity; use tracing::debug; +use tracing::level_filters::LevelFilter; +use tracing::warn; +use tracing_subscriber::Layer; +use tracing_subscriber::registry::LookupSpan; const ENV_ATTRIBUTE: &str = "env"; +const TRACEPARENT_ENV_VAR: &str = "TRACEPARENT"; +const TRACESTATE_ENV_VAR: &str = "TRACESTATE"; +static TRACEPARENT_CONTEXT: OnceLock> = OnceLock::new(); + +thread_local! { + static TRACEPARENT_GUARD: RefCell> = const { RefCell::new(None) }; +} pub struct OtelProvider { - pub logger: SdkLoggerProvider, + pub logger: Option, + pub tracer_provider: Option, + pub tracer: Option, } impl OtelProvider { pub fn shutdown(&self) { - let _ = self.logger.shutdown(); + if let Some(logger) = &self.logger { + let _ = logger.shutdown(); + } + if let Some(tracer_provider) = &self.tracer_provider { + let _ = tracer_provider.shutdown(); + } } pub fn from(settings: &OtelSettings) -> Result, Box> { - let resource = Resource::builder() - .with_service_name(settings.service_name.clone()) - .with_attributes(vec![ - KeyValue::new( - semconv::attribute::SERVICE_VERSION, - settings.service_version.clone(), - ), - KeyValue::new(ENV_ATTRIBUTE, settings.environment.clone()), - ]) - .build(); - - let mut builder = SdkLoggerProvider::builder().with_resource(resource); - - match &settings.exporter { - OtelExporter::None => { - debug!("No exporter enabled in OTLP settings."); - return Ok(None); - } - OtelExporter::OtlpGrpc { - endpoint, - headers, - tls, - } => { - debug!("Using OTLP Grpc exporter: {endpoint}"); - - let mut header_map = HeaderMap::new(); - for (key, value) in headers { - if let Ok(name) = HeaderName::from_bytes(key.as_bytes()) - && let Ok(val) = HeaderValue::from_str(value) - { - header_map.insert(name, val); - } - } - - let base_tls_config = ClientTlsConfig::new() - .with_enabled_roots() - .assume_http2(true); - - let tls_config = match tls.as_ref() { - Some(tls) => build_grpc_tls_config(endpoint, base_tls_config, tls)?, - None => base_tls_config, - }; - - let exporter = LogExporter::builder() - .with_tonic() - .with_endpoint(endpoint) - .with_metadata(MetadataMap::from_headers(header_map)) - .with_tls_config(tls_config) - .build()?; - - builder = builder.with_batch_exporter(exporter); - } - OtelExporter::OtlpHttp { - endpoint, - headers, - protocol, - tls, - } => { - debug!("Using OTLP Http exporter: {endpoint}"); - - let protocol = match protocol { - OtelHttpProtocol::Binary => Protocol::HttpBinary, - OtelHttpProtocol::Json => Protocol::HttpJson, - }; - - let mut exporter_builder = LogExporter::builder() - .with_http() - .with_endpoint(endpoint) - .with_protocol(protocol) - .with_headers(headers.clone()); - - if let Some(tls) = tls.as_ref() { - let client = build_http_client(tls)?; - exporter_builder = exporter_builder.with_http_client(client); - } - - let exporter = exporter_builder.build()?; - - builder = builder.with_batch_exporter(exporter); - } + let log_enabled = !matches!(settings.exporter, OtelExporter::None); + let trace_enabled = !matches!(settings.trace_exporter, OtelExporter::None); + + if !log_enabled && !trace_enabled { + debug!("No exporter enabled in OTLP settings."); + return Ok(None); + } + + let resource = make_resource(settings); + let logger = log_enabled + .then(|| build_logger(&resource, &settings.exporter)) + .transpose()?; + + let tracer_provider = trace_enabled + .then(|| build_tracer_provider(&resource, &settings.trace_exporter)) + .transpose()?; + + let tracer = tracer_provider + .as_ref() + .map(|provider| provider.tracer(settings.service_name.clone())); + + if let Some(provider) = tracer_provider.clone() { + let _ = global::set_tracer_provider(provider); + global::set_text_map_propagator(TraceContextPropagator::new()); + } + if tracer.is_some() { + attach_traceparent_context(); } Ok(Some(Self { - logger: builder.build(), + logger, + tracer_provider, + tracer, })) } + + pub fn logger_layer(&self) -> Option + Send + Sync> + where + S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync, + { + self.logger.as_ref().map(|logger| { + OpenTelemetryTracingBridge::new(logger).with_filter( + tracing_subscriber::filter::filter_fn(OtelProvider::codex_export_filter), + ) + }) + } + + pub fn tracing_layer(&self) -> Option + Send + Sync> + where + S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync, + { + self.tracer.as_ref().map(|tracer| { + tracing_opentelemetry::layer() + .with_tracer(tracer.clone()) + .with_filter(LevelFilter::INFO) + }) + } + + pub fn codex_export_filter(meta: &tracing::Metadata<'_>) -> bool { + meta.target().starts_with("codex_otel") + } } impl Drop for OtelProvider { fn drop(&mut self) { - let _ = self.logger.shutdown(); + if let Some(logger) = &self.logger { + let _ = logger.shutdown(); + } + if let Some(tracer_provider) = &self.tracer_provider { + let _ = tracer_provider.shutdown(); + } + } +} + +pub(crate) fn traceparent_context_from_env() -> Option { + TRACEPARENT_CONTEXT + .get_or_init(load_traceparent_context) + .clone() +} + +fn attach_traceparent_context() { + TRACEPARENT_GUARD.with(|guard| { + let mut guard = guard.borrow_mut(); + if guard.is_some() { + return; + } + if let Some(context) = traceparent_context_from_env() { + *guard = Some(context.attach()); + } + }); +} + +fn load_traceparent_context() -> Option { + let traceparent = env::var(TRACEPARENT_ENV_VAR).ok()?; + let tracestate = env::var(TRACESTATE_ENV_VAR).ok(); + + match extract_traceparent_context(traceparent, tracestate) { + Some(context) => { + debug!("TRACEPARENT detected; continuing trace from parent context"); + Some(context) + } + None => { + warn!("TRACEPARENT is set but invalid; ignoring trace context"); + None + } } } +fn extract_traceparent_context(traceparent: String, tracestate: Option) -> Option { + let mut headers = HashMap::new(); + headers.insert("traceparent".to_string(), traceparent); + if let Some(tracestate) = tracestate { + headers.insert("tracestate".to_string(), tracestate); + } + + let context = TraceContextPropagator::new().extract(&headers); + let span = context.span(); + let span_context = span.span_context(); + if !span_context.is_valid() { + return None; + } + Some(context) +} + +fn make_resource(settings: &OtelSettings) -> Resource { + Resource::builder() + .with_service_name(settings.service_name.clone()) + .with_attributes(vec![ + KeyValue::new( + semconv::attribute::SERVICE_VERSION, + settings.service_version.clone(), + ), + KeyValue::new(ENV_ATTRIBUTE, settings.environment.clone()), + ]) + .build() +} + +fn build_logger( + resource: &Resource, + exporter: &OtelExporter, +) -> Result> { + let mut builder = SdkLoggerProvider::builder().with_resource(resource.clone()); + + match exporter { + OtelExporter::None => return Ok(builder.build()), + OtelExporter::OtlpGrpc { + endpoint, + headers, + tls, + } => { + debug!("Using OTLP Grpc exporter: {endpoint}"); + + let header_map = build_header_map(headers); + + let base_tls_config = ClientTlsConfig::new() + .with_enabled_roots() + .assume_http2(true); + + let tls_config = match tls.as_ref() { + Some(tls) => build_grpc_tls_config(endpoint, base_tls_config, tls)?, + None => base_tls_config, + }; + + let exporter = LogExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .with_metadata(MetadataMap::from_headers(header_map)) + .with_tls_config(tls_config) + .build()?; + + builder = builder.with_batch_exporter(exporter); + } + OtelExporter::OtlpHttp { + endpoint, + headers, + protocol, + tls, + } => { + debug!("Using OTLP Http exporter: {endpoint}"); + + let protocol = match protocol { + OtelHttpProtocol::Binary => Protocol::HttpBinary, + OtelHttpProtocol::Json => Protocol::HttpJson, + }; + + let mut exporter_builder = LogExporter::builder() + .with_http() + .with_endpoint(endpoint) + .with_protocol(protocol) + .with_headers(headers.clone()); + + if let Some(tls) = tls.as_ref() { + let client = build_http_client(tls, OTEL_EXPORTER_OTLP_LOGS_TIMEOUT)?; + exporter_builder = exporter_builder.with_http_client(client); + } + + let exporter = exporter_builder.build()?; + + builder = builder.with_batch_exporter(exporter); + } + } + + Ok(builder.build()) +} + +fn build_tracer_provider( + resource: &Resource, + exporter: &OtelExporter, +) -> Result> { + let span_exporter = match exporter { + OtelExporter::None => return Ok(SdkTracerProvider::builder().build()), + OtelExporter::OtlpGrpc { + endpoint, + headers, + tls, + } => { + debug!("Using OTLP Grpc exporter for traces: {endpoint}"); + + let header_map = build_header_map(headers); + + let base_tls_config = ClientTlsConfig::new() + .with_enabled_roots() + .assume_http2(true); + + let tls_config = match tls.as_ref() { + Some(tls) => build_grpc_tls_config(endpoint, base_tls_config, tls)?, + None => base_tls_config, + }; + + SpanExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .with_metadata(MetadataMap::from_headers(header_map)) + .with_tls_config(tls_config) + .build()? + } + OtelExporter::OtlpHttp { + endpoint, + headers, + protocol, + tls, + } => { + debug!("Using OTLP Http exporter for traces: {endpoint}"); + + let protocol = match protocol { + OtelHttpProtocol::Binary => Protocol::HttpBinary, + OtelHttpProtocol::Json => Protocol::HttpJson, + }; + + let mut exporter_builder = SpanExporter::builder() + .with_http() + .with_endpoint(endpoint) + .with_protocol(protocol) + .with_headers(headers.clone()); + + if let Some(tls) = tls.as_ref() { + let client = build_http_client(tls, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT)?; + exporter_builder = exporter_builder.with_http_client(client); + } + + exporter_builder.build()? + } + }; + + let processor = BatchSpanProcessor::builder(span_exporter).build(); + + Ok(SdkTracerProvider::builder() + .with_resource(resource.clone()) + .with_span_processor(processor) + .build()) +} + +fn build_header_map(headers: &HashMap) -> HeaderMap { + let mut header_map = HeaderMap::new(); + for (key, value) in headers { + if let Ok(name) = HeaderName::from_bytes(key.as_bytes()) + && let Ok(val) = HeaderValue::from_str(value) + { + header_map.insert(name, val); + } + } + header_map +} + fn build_grpc_tls_config( endpoint: &str, tls_config: ClientTlsConfig, @@ -182,17 +406,21 @@ fn build_grpc_tls_config( /// `opentelemetry_sdk` `BatchLogProcessor` spawns a dedicated OS thread that uses /// `futures_executor::block_on()` rather than tokio. When the async reqwest client's /// timeout calls `tokio::time::sleep()`, it panics with "no reactor running". -fn build_http_client(tls: &OtelTlsConfig) -> Result> { +fn build_http_client( + tls: &OtelTlsConfig, + timeout_var: &str, +) -> Result> { // Wrap in block_in_place because reqwest::blocking::Client creates its own // internal tokio runtime, which would panic if built directly from an async context. - tokio::task::block_in_place(|| build_http_client_inner(tls)) + tokio::task::block_in_place(|| build_http_client_inner(tls, timeout_var)) } fn build_http_client_inner( tls: &OtelTlsConfig, + timeout_var: &str, ) -> Result> { - let mut builder = reqwest::blocking::Client::builder() - .timeout(resolve_otlp_timeout(OTEL_EXPORTER_OTLP_LOGS_TIMEOUT)); + let mut builder = + reqwest::blocking::Client::builder().timeout(resolve_otlp_timeout(timeout_var)); if let Some(path) = tls.ca_certificate.as_ref() { let (pem, location) = read_bytes(path)?; @@ -267,3 +495,32 @@ fn read_bytes(path: &AbsolutePathBuf) -> Result<(Vec, PathBuf), Box) -> Box { Box::new(io::Error::new(ErrorKind::InvalidData, message.into())) } + +#[cfg(test)] +mod tests { + use super::*; + use opentelemetry::trace::SpanId; + use opentelemetry::trace::TraceContextExt; + use opentelemetry::trace::TraceId; + + #[test] + fn parses_valid_traceparent() { + let trace_id = "00000000000000000000000000000001"; + let span_id = "0000000000000002"; + let context = extract_traceparent_context(format!("00-{trace_id}-{span_id}-01"), None) + .expect("trace context"); + let span = context.span(); + let span_context = span.span_context(); + assert_eq!( + span_context.trace_id(), + TraceId::from_hex(trace_id).unwrap() + ); + assert_eq!(span_context.span_id(), SpanId::from_hex(span_id).unwrap()); + assert!(span_context.is_remote()); + } + + #[test] + fn invalid_traceparent_returns_none() { + assert!(extract_traceparent_context("not-a-traceparent".to_string(), None).is_none()); + } +} diff --git a/codex-rs/tui/Cargo.toml b/codex-rs/tui/Cargo.toml index c440f09aeb3..1d9f3115915 100644 --- a/codex-rs/tui/Cargo.toml +++ b/codex-rs/tui/Cargo.toml @@ -52,7 +52,6 @@ image = { workspace = true, features = ["jpeg", "png"] } itertools = { workspace = true } lazy_static = { workspace = true } mcp-types = { workspace = true } -opentelemetry-appender-tracing = { workspace = true } pathdiff = { workspace = true } pulldown-cmark = { workspace = true } rand = { workspace = true } diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 36883b8664f..772eb19ee4a 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -23,7 +23,6 @@ use codex_core::find_conversation_path_by_id_str; use codex_core::get_platform_sandbox; use codex_core::protocol::AskForApproval; use codex_protocol::config_types::SandboxMode; -use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use std::fs::OpenOptions; use std::path::PathBuf; use tracing::error; @@ -271,7 +270,7 @@ pub async fn run_main( .with_writer(non_blocking) .with_target(false) .with_ansi(false) - .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL) .with_filter(env_filter()); let feedback = codex_feedback::CodexFeedback::new(); @@ -309,22 +308,16 @@ pub async fn run_main( } }; - if let Some(provider) = otel.as_ref() { - let otel_layer = OpenTelemetryTracingBridge::new(&provider.logger).with_filter( - tracing_subscriber::filter::filter_fn(codex_core::otel_init::codex_export_filter), - ); + let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer()); - let _ = tracing_subscriber::registry() - .with(file_layer) - .with(feedback_layer) - .with(otel_layer) - .try_init(); - } else { - let _ = tracing_subscriber::registry() - .with(file_layer) - .with(feedback_layer) - .try_init(); - }; + let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer()); + + let _ = tracing_subscriber::registry() + .with(file_layer) + .with(feedback_layer) + .with(otel_logger_layer) + .with(otel_tracing_layer) + .try_init(); run_ratatui_app( cli, diff --git a/codex-rs/tui2/Cargo.toml b/codex-rs/tui2/Cargo.toml index 7ac1fb859c9..384703bea62 100644 --- a/codex-rs/tui2/Cargo.toml +++ b/codex-rs/tui2/Cargo.toml @@ -52,7 +52,6 @@ image = { workspace = true, features = ["jpeg", "png"] } itertools = { workspace = true } lazy_static = { workspace = true } mcp-types = { workspace = true } -opentelemetry-appender-tracing = { workspace = true } pathdiff = { workspace = true } pulldown-cmark = { workspace = true } rand = { workspace = true } diff --git a/codex-rs/tui2/src/lib.rs b/codex-rs/tui2/src/lib.rs index d9793a07a04..67a6a08ae56 100644 --- a/codex-rs/tui2/src/lib.rs +++ b/codex-rs/tui2/src/lib.rs @@ -23,7 +23,6 @@ use codex_core::find_conversation_path_by_id_str; use codex_core::get_platform_sandbox; use codex_core::protocol::AskForApproval; use codex_protocol::config_types::SandboxMode; -use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use std::fs::OpenOptions; use std::path::PathBuf; use tracing::error; @@ -307,22 +306,16 @@ pub async fn run_main( } }; - if let Some(provider) = otel.as_ref() { - let otel_layer = OpenTelemetryTracingBridge::new(&provider.logger).with_filter( - tracing_subscriber::filter::filter_fn(codex_core::otel_init::codex_export_filter), - ); + let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer()); - let _ = tracing_subscriber::registry() - .with(file_layer) - .with(feedback_layer) - .with(otel_layer) - .try_init(); - } else { - let _ = tracing_subscriber::registry() - .with(file_layer) - .with(feedback_layer) - .try_init(); - }; + let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer()); + + let _ = tracing_subscriber::registry() + .with(file_layer) + .with(feedback_layer) + .with(otel_tracing_layer) + .with(otel_logger_layer) + .try_init(); run_ratatui_app( cli,