Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions codex-rs/core/src/rollout/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use codex_protocol::protocol::SessionSource;
use codex_state::BackfillStats;
use codex_state::DB_ERROR_METRIC;
use codex_state::DB_METRIC_BACKFILL;
use codex_state::DB_METRIC_BACKFILL_DURATION_MS;
use codex_state::ExtractionOutcome;
use codex_state::ThreadMetadataBuilder;
use codex_state::apply_rollout_item;
Expand Down Expand Up @@ -128,6 +129,7 @@ pub(crate) async fn backfill_sessions(
config: &Config,
otel: Option<&OtelManager>,
) {
let timer = otel.and_then(|otel| otel.start_timer(DB_METRIC_BACKFILL_DURATION_MS, &[]).ok());
let sessions_root = config.codex_home.join(rollout::SESSIONS_SUBDIR);
let archived_root = config.codex_home.join(rollout::ARCHIVED_SESSIONS_SUBDIR);
let mut rollout_paths: Vec<(PathBuf, bool)> = Vec::new();
Expand Down Expand Up @@ -210,6 +212,16 @@ pub(crate) async fn backfill_sessions(
&[("status", "failed")],
);
}
if let Some(timer) = timer.as_ref() {
let status = if stats.failed == 0 {
"success"
} else if stats.upserted == 0 {
"failed"
} else {
"partial_failure"
};
let _ = timer.record(&[("status", status)]);
}
}

async fn file_modified_time_utc(path: &Path) -> Option<DateTime<Utc>> {
Expand Down
25 changes: 24 additions & 1 deletion codex-rs/otel/src/metrics/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ use tracing::debug;

const ENV_ATTRIBUTE: &str = "env";
const METER_NAME: &str = "codex";
const DURATION_UNIT: &str = "ms";
const DURATION_DESCRIPTION: &str = "Duration in milliseconds.";

#[derive(Debug)]
struct MetricsClientInner {
meter_provider: SdkMeterProvider,
meter: Meter,
counters: Mutex<HashMap<String, Counter<u64>>>,
histograms: Mutex<HashMap<String, Histogram<f64>>>,
duration_histograms: Mutex<HashMap<String, Histogram<f64>>>,
default_tags: BTreeMap<String, String>,
}

Expand Down Expand Up @@ -81,6 +84,25 @@ impl MetricsClientInner {
Ok(())
}

fn duration_histogram(&self, name: &str, value: i64, tags: &[(&str, &str)]) -> Result<()> {
validate_metric_name(name)?;
let attributes = self.attributes(tags)?;

let mut histograms = self
.duration_histograms
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let histogram = histograms.entry(name.to_string()).or_insert_with(|| {
self.meter
.f64_histogram(name.to_string())
.with_unit(DURATION_UNIT)
.with_description(DURATION_DESCRIPTION)
.build()
});
histogram.record(value as f64, &attributes);
Ok(())
}

fn attributes(&self, tags: &[(&str, &str)]) -> Result<Vec<KeyValue>> {
if tags.is_empty() {
return Ok(self
Expand Down Expand Up @@ -150,6 +172,7 @@ impl MetricsClient {
meter,
counters: Mutex::new(HashMap::new()),
histograms: Mutex::new(HashMap::new()),
duration_histograms: Mutex::new(HashMap::new()),
default_tags: config.default_tags,
})))
}
Expand All @@ -171,7 +194,7 @@ impl MetricsClient {
duration: Duration,
tags: &[(&str, &str)],
) -> Result<()> {
self.histogram(
self.0.duration_histogram(
name,
duration.as_millis().min(i64::MAX as u128) as i64,
tags,
Expand Down
11 changes: 10 additions & 1 deletion codex-rs/otel/tests/suite/timing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ fn record_duration_records_histogram() -> Result<()> {
)?;
metrics.shutdown()?;

let resource_metrics = latest_metrics(&exporter);
let (bounds, bucket_counts, sum, count) =
histogram_data(&latest_metrics(&exporter), "codex.request_latency");
histogram_data(&resource_metrics, "codex.request_latency");
assert!(!bounds.is_empty());
assert_eq!(bucket_counts.iter().sum::<u64>(), 1);
assert_eq!(sum, 15.0);
assert_eq!(count, 1);
let metric = crate::harness::find_metric(&resource_metrics, "codex.request_latency")
.unwrap_or_else(|| panic!("metric codex.request_latency missing"));
assert_eq!(metric.unit(), "ms");
assert_eq!(metric.description(), "Duration in milliseconds.");

Ok(())
}
Expand All @@ -46,6 +51,10 @@ fn timer_result_records_success() -> Result<()> {
assert!(!bounds.is_empty());
assert_eq!(count, 1);
assert_eq!(bucket_counts.iter().sum::<u64>(), 1);
let metric = crate::harness::find_metric(&resource_metrics, "codex.request_latency")
.unwrap_or_else(|| panic!("metric codex.request_latency missing"));
assert_eq!(metric.unit(), "ms");
assert_eq!(metric.description(), "Duration in milliseconds.");
let attrs = attributes_to_map(
match crate::harness::find_metric(&resource_metrics, "codex.request_latency").and_then(
|metric| match metric.data() {
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,7 @@ pub use runtime::STATE_DB_FILENAME;
pub const DB_ERROR_METRIC: &str = "codex.db.error";
/// Metrics on backfill process during first init of the db. Tags: [status]
pub const DB_METRIC_BACKFILL: &str = "codex.db.backfill";
/// Metrics on backfill duration during first init of the db. Tags: [status]
pub const DB_METRIC_BACKFILL_DURATION_MS: &str = "codex.db.backfill.duration_ms";
/// Metrics on errors during comparison between DB and rollout file. Tags: [stage]
pub const DB_METRIC_COMPARE_ERROR: &str = "codex.db.compare_error";
Loading