Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions codex-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ opentelemetry-appender-tracing = "0.30.0"
opentelemetry-otlp = "0.30.0"
opentelemetry-semantic-conventions = "0.30.0"
opentelemetry_sdk = "0.30.0"
tracing-opentelemetry = "0.31.0"
os_info = "3.12.0"
owo-colors = "4.2.0"
path-absolutize = "3.1.1"
Expand Down
1 change: 0 additions & 1 deletion codex-rs/app-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ tokio = { workspace = true, features = [
] }
tracing = { workspace = true, features = ["log"] }
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
opentelemetry-appender-tracing = { workspace = true }
uuid = { workspace = true, features = ["serde", "v7"] }

[dev-dependencies]
Expand Down
13 changes: 7 additions & 6 deletions codex-rs/app-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use codex_common::CliConfigOverrides;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::path::PathBuf;
Expand Down Expand Up @@ -103,6 +102,7 @@ pub async fn run_main(
// control the log level with `RUST_LOG`.
let stderr_fmt = tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL)
.with_filter(EnvFilter::from_default_env());

let feedback_layer = tracing_subscriber::fmt::layer()
Expand All @@ -111,14 +111,15 @@ pub async fn run_main(
.with_target(false)
.with_filter(Targets::new().with_default(Level::TRACE));

let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());

let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());

let _ = tracing_subscriber::registry()
.with(stderr_fmt)
.with(feedback_layer)
.with(otel.as_ref().map(|provider| {
OpenTelemetryTracingBridge::new(&provider.logger).with_filter(
tracing_subscriber::filter::filter_fn(codex_core::otel_init::codex_export_filter),
)
}))
.with(otel_logger_layer)
.with(otel_tracing_layer)
.try_init();

// Task: process incoming messages.
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/codex-api/src/endpoint/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use codex_protocol::protocol::SessionSource;
use http::HeaderMap;
use serde_json::Value;
use std::sync::Arc;
use tracing::instrument;

pub struct ResponsesClient<T: HttpTransport, A: AuthProvider> {
streaming: StreamingClient<T, A>,
Expand Down Expand Up @@ -57,6 +58,7 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
self.stream(request.body, request.headers).await
}

#[instrument(skip_all, err)]
pub async fn stream_prompt(
&self,
model: &str,
Expand Down
6 changes: 6 additions & 0 deletions codex-rs/codex-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,19 @@ bytes = { workspace = true }
eventsource-stream = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
opentelemetry = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true, features = ["json", "stream"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "time", "sync"] }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }

[lints]
workspace = true

[dev-dependencies]
opentelemetry_sdk = { workspace = true }
tracing-subscriber = { workspace = true }
84 changes: 83 additions & 1 deletion codex-rs/codex-client/src/default_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use http::Error as HttpError;
use opentelemetry::global;
use opentelemetry::propagation::Injector;
use reqwest::IntoUrl;
use reqwest::Method;
use reqwest::Response;
Expand All @@ -9,6 +11,8 @@ use serde::Serialize;
use std::collections::HashMap;
use std::fmt::Display;
use std::time::Duration;
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;

#[derive(Clone, Debug)]
pub struct CodexHttpClient {
Expand Down Expand Up @@ -101,7 +105,9 @@ impl CodexRequestBuilder {
}

pub async fn send(self) -> Result<Response, reqwest::Error> {
match self.builder.send().await {
let headers = trace_headers();

match self.builder.headers(headers).send().await {
Ok(response) => {
let request_ids = Self::extract_request_ids(&response);
tracing::debug!(
Expand Down Expand Up @@ -141,3 +147,79 @@ impl CodexRequestBuilder {
.collect()
}
}

struct HeaderMapInjector<'a>(&'a mut HeaderMap);

impl<'a> Injector for HeaderMapInjector<'a> {
fn set(&mut self, key: &str, value: String) {
if let (Ok(name), Ok(val)) = (
HeaderName::from_bytes(key.as_bytes()),
HeaderValue::from_str(&value),
) {
self.0.insert(name, val);
}
}
}

fn trace_headers() -> HeaderMap {
let mut headers = HeaderMap::new();
global::get_text_map_propagator(|prop| {
prop.inject_context(
&Span::current().context(),
&mut HeaderMapInjector(&mut headers),
);
});
headers
}

#[cfg(test)]
mod tests {
use super::*;
use opentelemetry::propagation::Extractor;
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::trace::TraceContextExt;
use opentelemetry::trace::TracerProvider;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::SdkTracerProvider;
use tracing::info_span;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

#[test]
fn inject_trace_headers_uses_current_span_context() {
global::set_text_map_propagator(TraceContextPropagator::new());

let provider = SdkTracerProvider::builder().build();
let tracer = provider.tracer("test-tracer");
let subscriber =
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
let _guard = subscriber.set_default();

let span = info_span!("client_request");
let _entered = span.enter();
let span_context = span.context().span().span_context().clone();

let headers = trace_headers();

let extractor = HeaderMapExtractor(&headers);
let extracted = TraceContextPropagator::new().extract(&extractor);
let extracted_span = extracted.span();
let extracted_context = extracted_span.span_context();

assert!(extracted_context.is_valid());
assert_eq!(extracted_context.trace_id(), span_context.trace_id());
assert_eq!(extracted_context.span_id(), span_context.span_id());
}

struct HeaderMapExtractor<'a>(&'a HeaderMap);

impl<'a> Extractor for HeaderMapExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|value| value.to_str().ok())
}

fn keys(&self) -> Vec<&str> {
self.0.keys().map(HeaderName::as_str).collect()
}
}
}
3 changes: 2 additions & 1 deletion codex-rs/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ codex-execpolicy = { workspace = true }
codex-file-search = { workspace = true }
codex-git = { workspace = true }
codex-keyring-store = { workspace = true }
codex-otel = { workspace = true, features = ["otel"] }
codex-otel = { workspace = true }
codex-protocol = { workspace = true }
codex-rmcp-client = { workspace = true }
codex-utils-absolute-path = { workspace = true }
Expand Down Expand Up @@ -132,6 +132,7 @@ pretty_assertions = { workspace = true }
serial_test = { workspace = true }
tempfile = { workspace = true }
tokio-test = { workspace = true }
tracing-subscriber = { workspace = true }
tracing-test = { workspace = true, features = ["no-env-filter"] }
walkdir = { workspace = true }
wiremock = { workspace = true }
Expand Down
Loading
Loading