Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions codex-rs/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ use crate::transport_manager::TransportManager;
pub const WEB_SEARCH_ELIGIBLE_HEADER: &str = "x-oai-web-search-eligible";
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";
pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
"x-responsesapi-include-timing-metrics";

#[derive(Debug, Default)]
struct TurnMetadataCache {
Expand Down Expand Up @@ -489,6 +491,12 @@ impl ModelClientSession {
if needs_new {
let mut headers = options.extra_headers.clone();
headers.extend(build_conversation_headers(options.conversation_id.clone()));
if self.state.config.features.enabled(Feature::RuntimeMetrics) {
headers.insert(
X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER,
HeaderValue::from_static("true"),
);
}
let websocket_telemetry = self.build_websocket_telemetry();
let new_conn: ApiWebSocketConnection =
ApiWebSocketResponsesClient::new(api_provider, api_auth)
Expand Down
1 change: 1 addition & 0 deletions codex-rs/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub mod token_data;
mod truncate;
mod unified_exec;
pub mod windows_sandbox;
pub use client::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER;
pub use model_provider_info::DEFAULT_LMSTUDIO_PORT;
pub use model_provider_info::DEFAULT_OLLAMA_PORT;
pub use model_provider_info::LMSTUDIO_OSS_PROVIDER_ID;
Expand Down
77 changes: 77 additions & 0 deletions codex-rs/core/tests/suite/client_websockets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use codex_core::ResponseEvent;
use codex_core::ResponseItem;
use codex_core::TransportManager;
use codex_core::WireApi;
use codex_core::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER;
use codex_core::features::Feature;
use codex_core::models_manager::manager::ModelsManager;
use codex_core::protocol::SessionSource;
Expand Down Expand Up @@ -102,6 +103,72 @@ async fn responses_websocket_emits_websocket_telemetry_events() {
server.shutdown().await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_includes_timing_metrics_header_when_runtime_metrics_enabled() {
skip_if_no_network!();

let server = start_websocket_server(vec![vec![vec![
ev_response_created("resp-1"),
serde_json::json!({
"type": "responsesapi.websocket_timing",
"timing_metrics": {
"responses_duration_excl_engine_and_client_tool_time_ms": 120,
"engine_service_total_ms": 450
}
}),
ev_completed("resp-1"),
]]])
.await;

let harness = websocket_harness_with_runtime_metrics(&server, true).await;
harness.otel_manager.reset_runtime_metrics();
let mut session = harness.client.new_session(None);
let prompt = prompt_with_input(vec![message_item("hello")]);

stream_until_complete(&mut session, &prompt).await;
tokio::time::sleep(Duration::from_millis(10)).await;

let handshake = server.single_handshake();
assert_eq!(
handshake.header(X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER),
Some("true".to_string())
);

let summary = harness
.otel_manager
.runtime_metrics_summary()
.expect("runtime metrics summary");
assert_eq!(summary.responses_api_overhead_ms, 120);
assert_eq!(summary.responses_api_inference_time_ms, 450);

server.shutdown().await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_omits_timing_metrics_header_when_runtime_metrics_disabled() {
skip_if_no_network!();

let server = start_websocket_server(vec![vec![vec![
ev_response_created("resp-1"),
ev_completed("resp-1"),
]]])
.await;

let harness = websocket_harness_with_runtime_metrics(&server, false).await;
let mut session = harness.client.new_session(None);
let prompt = prompt_with_input(vec![message_item("hello")]);

stream_until_complete(&mut session, &prompt).await;

let handshake = server.single_handshake();
assert_eq!(
handshake.header(X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER),
None
);

server.shutdown().await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_emits_reasoning_included_event() {
skip_if_no_network!();
Expand Down Expand Up @@ -241,11 +308,21 @@ fn websocket_provider(server: &WebSocketTestServer) -> ModelProviderInfo {
}

async fn websocket_harness(server: &WebSocketTestServer) -> WebsocketTestHarness {
websocket_harness_with_runtime_metrics(server, false).await
}

async fn websocket_harness_with_runtime_metrics(
server: &WebSocketTestServer,
runtime_metrics_enabled: bool,
) -> WebsocketTestHarness {
let provider = websocket_provider(server);
let codex_home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&codex_home).await;
config.model = Some(MODEL.to_string());
config.features.enable(Feature::ResponsesWebsockets);
if runtime_metrics_enabled {
config.features.enable(Feature::RuntimeMetrics);
}
let config = Arc::new(config);
let model_info = ModelsManager::construct_model_info_offline(MODEL, &config);
let conversation_id = ThreadId::new();
Expand Down
4 changes: 4 additions & 0 deletions codex-rs/otel/src/metrics/names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ pub(crate) const WEBSOCKET_REQUEST_COUNT_METRIC: &str = "codex.websocket.request
pub(crate) const WEBSOCKET_REQUEST_DURATION_METRIC: &str = "codex.websocket.request.duration_ms";
pub(crate) const WEBSOCKET_EVENT_COUNT_METRIC: &str = "codex.websocket.event";
pub(crate) const WEBSOCKET_EVENT_DURATION_METRIC: &str = "codex.websocket.event.duration_ms";
pub(crate) const RESPONSES_API_OVERHEAD_DURATION_METRIC: &str =
"codex.responses_api_overhead.duration_ms";
pub(crate) const RESPONSES_API_INFERENCE_TIME_DURATION_METRIC: &str =
"codex.responses_api_inference_time.duration_ms";
12 changes: 12 additions & 0 deletions codex-rs/otel/src/metrics/runtime_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::metrics::names::API_CALL_COUNT_METRIC;
use crate::metrics::names::API_CALL_DURATION_METRIC;
use crate::metrics::names::RESPONSES_API_INFERENCE_TIME_DURATION_METRIC;
use crate::metrics::names::RESPONSES_API_OVERHEAD_DURATION_METRIC;
use crate::metrics::names::SSE_EVENT_COUNT_METRIC;
use crate::metrics::names::SSE_EVENT_DURATION_METRIC;
use crate::metrics::names::TOOL_CALL_COUNT_METRIC;
Expand Down Expand Up @@ -32,6 +34,8 @@ pub struct RuntimeMetricsSummary {
pub streaming_events: RuntimeMetricTotals,
pub websocket_calls: RuntimeMetricTotals,
pub websocket_events: RuntimeMetricTotals,
pub responses_api_overhead_ms: u64,
pub responses_api_inference_time_ms: u64,
}

impl RuntimeMetricsSummary {
Expand All @@ -41,6 +45,8 @@ impl RuntimeMetricsSummary {
&& self.streaming_events.is_empty()
&& self.websocket_calls.is_empty()
&& self.websocket_events.is_empty()
&& self.responses_api_overhead_ms == 0
&& self.responses_api_inference_time_ms == 0
}

pub(crate) fn from_snapshot(snapshot: &ResourceMetrics) -> Self {
Expand All @@ -64,12 +70,18 @@ impl RuntimeMetricsSummary {
count: sum_counter(snapshot, WEBSOCKET_EVENT_COUNT_METRIC),
duration_ms: sum_histogram_ms(snapshot, WEBSOCKET_EVENT_DURATION_METRIC),
};
let responses_api_overhead_ms =
sum_histogram_ms(snapshot, RESPONSES_API_OVERHEAD_DURATION_METRIC);
let responses_api_inference_time_ms =
sum_histogram_ms(snapshot, RESPONSES_API_INFERENCE_TIME_DURATION_METRIC);
Self {
tool_calls,
api_calls,
streaming_events,
websocket_calls,
websocket_events,
responses_api_overhead_ms,
responses_api_inference_time_ms,
}
}
}
Expand Down
38 changes: 38 additions & 0 deletions codex-rs/otel/src/traces/otel_manager.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::metrics::names::API_CALL_COUNT_METRIC;
use crate::metrics::names::API_CALL_DURATION_METRIC;
use crate::metrics::names::RESPONSES_API_INFERENCE_TIME_DURATION_METRIC;
use crate::metrics::names::RESPONSES_API_OVERHEAD_DURATION_METRIC;
use crate::metrics::names::SSE_EVENT_COUNT_METRIC;
use crate::metrics::names::SSE_EVENT_DURATION_METRIC;
use crate::metrics::names::TOOL_CALL_COUNT_METRIC;
Expand Down Expand Up @@ -42,6 +44,10 @@ pub use crate::ToolDecisionSource;

const SSE_UNKNOWN_KIND: &str = "unknown";
const WEBSOCKET_UNKNOWN_KIND: &str = "unknown";
const RESPONSES_WEBSOCKET_TIMING_KIND: &str = "responsesapi.websocket_timing";
const RESPONSES_WEBSOCKET_TIMING_METRICS_FIELD: &str = "timing_metrics";
const RESPONSES_API_OVERHEAD_FIELD: &str = "responses_duration_excl_engine_and_client_tool_time_ms";
const RESPONSES_API_INFERENCE_FIELD: &str = "engine_service_total_ms";

impl OtelManager {
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -252,6 +258,9 @@ impl OtelManager {
.get("type")
.and_then(|value| value.as_str())
.map(std::string::ToString::to_string);
if kind.as_deref() == Some(RESPONSES_WEBSOCKET_TIMING_KIND) {
self.record_responses_websocket_timing_metrics(&value);
}
if kind.as_deref() == Some("response.failed") {
success = false;
error_message = value
Expand Down Expand Up @@ -651,6 +660,22 @@ impl OtelManager {
);
}

fn record_responses_websocket_timing_metrics(&self, value: &serde_json::Value) {
let timing_metrics = value.get(RESPONSES_WEBSOCKET_TIMING_METRICS_FIELD);

let overhead_value =
timing_metrics.and_then(|value| value.get(RESPONSES_API_OVERHEAD_FIELD));
if let Some(duration) = duration_from_ms_value(overhead_value) {
self.record_duration(RESPONSES_API_OVERHEAD_DURATION_METRIC, duration, &[]);
}

let inference_value =
timing_metrics.and_then(|value| value.get(RESPONSES_API_INFERENCE_FIELD));
if let Some(duration) = duration_from_ms_value(inference_value) {
self.record_duration(RESPONSES_API_INFERENCE_TIME_DURATION_METRIC, duration, &[]);
}
}

fn responses_type(event: &ResponseEvent) -> String {
match event {
ResponseEvent::Created => "created".into(),
Expand Down Expand Up @@ -689,3 +714,16 @@ impl OtelManager {
fn timestamp() -> String {
Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)
}

fn duration_from_ms_value(value: Option<&serde_json::Value>) -> Option<Duration> {
let value = value?;
let ms = value
.as_f64()
.or_else(|| value.as_i64().map(|v| v as f64))
.or_else(|| value.as_u64().map(|v| v as f64))?;
if !ms.is_finite() || ms < 0.0 {
return None;
}
let clamped = ms.min(u64::MAX as f64);
Some(Duration::from_millis(clamped.round() as u64))
}
14 changes: 12 additions & 2 deletions codex-rs/otel/tests/suite/runtime_summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
r#"{"type":"response.created"}"#.into(),
))));
manager.record_websocket_event(&ws_response, Duration::from_millis(80));
let ws_timing_response: std::result::Result<
Option<std::result::Result<Message, tokio_tungstenite::tungstenite::Error>>,
codex_api::ApiError,
> = Ok(Some(Ok(Message::Text(
r#"{"type":"responsesapi.websocket_timing","timing_metrics":{"responses_duration_excl_engine_and_client_tool_time_ms":124,"engine_service_total_ms":457}}"#
.into(),
))));
manager.record_websocket_event(&ws_timing_response, Duration::from_millis(20));

let summary = manager
.runtime_metrics_summary()
Expand All @@ -84,9 +92,11 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
duration_ms: 400,
},
websocket_events: RuntimeMetricTotals {
count: 1,
duration_ms: 80,
count: 2,
duration_ms: 100,
},
responses_api_overhead_ms: 124,
responses_api_inference_time_ms: 457,
};
assert_eq!(summary, expected);

Expand Down
14 changes: 13 additions & 1 deletion codex-rs/tui/src/history_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2213,6 +2213,14 @@ fn runtime_metrics_label(summary: RuntimeMetricsSummary) -> Option<String> {
summary.websocket_events.count
));
}
if summary.responses_api_overhead_ms > 0 {
let duration = format_duration_ms(summary.responses_api_overhead_ms);
parts.push(format!("Responses API overhead: {duration}"));
}
if summary.responses_api_inference_time_ms > 0 {
let duration = format_duration_ms(summary.responses_api_inference_time_ms);
parts.push(format!("Responses API inference: {duration}"));
}
if parts.is_empty() {
None
} else {
Expand Down Expand Up @@ -2381,9 +2389,11 @@ mod tests {
count: 4,
duration_ms: 1_200,
},
responses_api_overhead_ms: 650,
responses_api_inference_time_ms: 1_940,
};
let cell = FinalMessageSeparator::new(Some(12), Some(summary));
let rendered = render_lines(&cell.display_lines(200));
let rendered = render_lines(&cell.display_lines(300));

assert_eq!(rendered.len(), 1);
assert!(!rendered[0].contains("Worked for"));
Expand All @@ -2392,6 +2402,8 @@ mod tests {
assert!(rendered[0].contains("WebSocket: 1 events send (700ms)"));
assert!(rendered[0].contains("Streams: 6 events (900ms)"));
assert!(rendered[0].contains("4 events received (1.2s)"));
assert!(rendered[0].contains("Responses API overhead: 650ms"));
assert!(rendered[0].contains("Responses API inference: 1.9s"));
}

#[test]
Expand Down
Loading