diff --git a/codex-rs/core/src/memories/mod.rs b/codex-rs/core/src/memories/mod.rs index 219fb2ac351..e9eda2add5e 100644 --- a/codex-rs/core/src/memories/mod.rs +++ b/codex-rs/core/src/memories/mod.rs @@ -63,6 +63,17 @@ mod phase_two { pub(super) const JOB_HEARTBEAT_SECONDS: u64 = 30; } +mod metrics { + /// Number of phase-1 startup jobs grouped by status. + pub(super) const MEMORY_PHASE_ONE_JOBS: &str = "codex.memory.phase1"; + /// Number of raw memories produced by phase-1 startup extraction. + pub(super) const MEMORY_PHASE_ONE_OUTPUT: &str = "codex.memory.phase1.output"; + /// Number of phase-2 startup jobs grouped by status. + pub(super) const MEMORY_PHASE_TWO_JOBS: &str = "codex.memory.phase2"; + /// Number of stage-1 memories included in each phase-2 consolidation step. + pub(super) const MEMORY_PHASE_TWO_INPUT: &str = "codex.memory.phase2.input"; +} + pub fn memory_root(codex_home: &Path) -> PathBuf { codex_home.join("memories") } diff --git a/codex-rs/core/src/memories/startup/dispatch.rs b/codex-rs/core/src/memories/startup/dispatch.rs index 9a25faa3ec7..534ce0b0c40 100644 --- a/codex-rs/core/src/memories/startup/dispatch.rs +++ b/codex-rs/core/src/memories/startup/dispatch.rs @@ -2,6 +2,7 @@ use crate::codex::Session; use crate::config::Config; use crate::config::Constrained; use crate::memories::memory_root; +use crate::memories::metrics; use crate::memories::phase_two; use crate::memories::prompts::build_consolidation_prompt; use crate::memories::startup::phase2::spawn_phase2_completion_task; @@ -34,8 +35,14 @@ pub(super) async fn run_global_memory_consolidation( session: &Arc, config: Arc, ) -> bool { + let otel_manager = &session.services.otel_manager; let Some(state_db) = session.services.state_db.as_deref() else { warn!("state db unavailable; skipping global memory consolidation"); + otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", "skipped_state_db_unavailable")], + ); return false; }; @@ -46,6 +53,11 @@ pub(super) async fn run_global_memory_consolidation( Ok(claim) => claim, Err(err) => { warn!("state db try_claim_global_phase2_job failed during memories startup: {err}"); + otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", "failed_claim")], + ); return false; } }; @@ -53,13 +65,26 @@ pub(super) async fn run_global_memory_consolidation( codex_state::Phase2JobClaimOutcome::Claimed { ownership_token, input_watermark, - } => (ownership_token, input_watermark), + } => { + otel_manager.counter(metrics::MEMORY_PHASE_TWO_JOBS, 1, &[("status", "claimed")]); + (ownership_token, input_watermark) + } codex_state::Phase2JobClaimOutcome::SkippedNotDirty => { debug!("memory phase-2 global lock is up-to-date; skipping consolidation"); + otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", "skipped_not_dirty")], + ); return false; } codex_state::Phase2JobClaimOutcome::SkippedRunning => { debug!("memory phase-2 global consolidation already running; skipping"); + otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", "skipped_running")], + ); return false; } }; @@ -89,6 +114,11 @@ pub(super) async fn run_global_memory_consolidation( .set(consolidation_sandbox_policy) { warn!("memory phase-2 consolidation sandbox policy was rejected by constraints: {err}"); + otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", "failed_sandbox_policy")], + ); let _ = state_db .mark_global_phase2_job_failed( &ownership_token, @@ -108,6 +138,11 @@ pub(super) async fn run_global_memory_consolidation( Ok(memories) => memories, Err(err) => { warn!("state db list_stage1_outputs_for_global failed during consolidation: {err}"); + otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", "failed_load_stage1_outputs")], + ); let _ = state_db .mark_global_phase2_job_failed( &ownership_token, @@ -118,9 +153,21 @@ pub(super) async fn run_global_memory_consolidation( return false; } }; + if !latest_memories.is_empty() { + otel_manager.counter( + metrics::MEMORY_PHASE_TWO_INPUT, + latest_memories.len() as i64, + &[], + ); + } let completion_watermark = completion_watermark(claimed_watermark, &latest_memories); if let Err(err) = sync_rollout_summaries_from_memories(&root, &latest_memories).await { warn!("failed syncing local memory artifacts for global consolidation: {err}"); + otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", "failed_sync_artifacts")], + ); let _ = state_db .mark_global_phase2_job_failed( &ownership_token, @@ -133,6 +180,11 @@ pub(super) async fn run_global_memory_consolidation( if let Err(err) = rebuild_raw_memories_file_from_memories(&root, &latest_memories).await { warn!("failed rebuilding raw memories aggregate for global consolidation: {err}"); + otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", "failed_rebuild_raw_memories")], + ); let _ = state_db .mark_global_phase2_job_failed( &ownership_token, @@ -147,6 +199,11 @@ pub(super) async fn run_global_memory_consolidation( let _ = state_db .mark_global_phase2_job_succeeded(&ownership_token, completion_watermark) .await; + otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", "succeeded_no_input")], + ); return false; } @@ -169,6 +226,11 @@ pub(super) async fn run_global_memory_consolidation( info!( "memory phase-2 global consolidation agent started: agent_id={consolidation_agent_id}" ); + otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", "agent_spawned")], + ); spawn_phase2_completion_task( session.as_ref(), ownership_token, @@ -179,6 +241,11 @@ pub(super) async fn run_global_memory_consolidation( } Err(err) => { warn!("failed to spawn global memory consolidation agent: {err}"); + otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", "failed_spawn_agent")], + ); let _ = state_db .mark_global_phase2_job_failed( &ownership_token, diff --git a/codex-rs/core/src/memories/startup/mod.rs b/codex-rs/core/src/memories/startup/mod.rs index 1422df7ac64..e4a0a7a281a 100644 --- a/codex-rs/core/src/memories/startup/mod.rs +++ b/codex-rs/core/src/memories/startup/mod.rs @@ -7,6 +7,7 @@ use crate::codex::TurnContext; use crate::config::Config; use crate::error::Result as CodexResult; use crate::features::Feature; +use crate::memories::metrics; use crate::memories::phase_one; use crate::rollout::INTERACTIVE_SESSION_SOURCES; use codex_otel::OtelManager; @@ -19,6 +20,13 @@ use std::sync::Arc; use tracing::info; use tracing::warn; +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum PhaseOneJobOutcome { + SucceededWithOutput, + SucceededNoOutput, + Failed, +} + pub(super) const PHASE_ONE_THREAD_SCAN_LIMIT: usize = 5_000; #[derive(Clone)] @@ -79,8 +87,19 @@ pub(super) async fn run_memories_startup_pipeline( session: &Arc, config: Arc, ) -> CodexResult<()> { + let otel_manager = &session.services.otel_manager; let Some(state_db) = session.services.state_db.as_deref() else { warn!("state db unavailable for memories startup pipeline; skipping"); + otel_manager.counter( + metrics::MEMORY_PHASE_ONE_JOBS, + 1, + &[("status", "skipped_state_db_unavailable")], + ); + otel_manager.counter( + metrics::MEMORY_PHASE_ONE_JOBS, + 1, + &[("status", "skipped_state_db_unavailable")], + ); return Ok(()); }; @@ -106,12 +125,24 @@ pub(super) async fn run_memories_startup_pipeline( Ok(claims) => claims, Err(err) => { warn!("state db claim_stage1_jobs_for_startup failed during memories startup: {err}"); + otel_manager.counter( + metrics::MEMORY_PHASE_ONE_JOBS, + 1, + &[("status", "failed_claim")], + ); Vec::new() } }; let claimed_count = claimed_candidates.len(); - let mut succeeded_count = 0; + if claimed_count == 0 { + otel_manager.counter( + metrics::MEMORY_PHASE_ONE_JOBS, + 1, + &[("status", "skipped_no_candidates")], + ); + } + let mut phase_one_outcomes = Vec::new(); if claimed_count > 0 { let turn_context = session.new_default_turn().await; let stage_one_context = StageOneRequestContext::from_turn_context( @@ -119,7 +150,7 @@ pub(super) async fn run_memories_startup_pipeline( turn_context.resolve_turn_metadata_header().await, ); - succeeded_count = futures::stream::iter(claimed_candidates.into_iter()) + phase_one_outcomes = futures::stream::iter(claimed_candidates.into_iter()) .map(|claim| { let session = Arc::clone(session); let stage_one_context = stage_one_context.clone(); @@ -145,24 +176,29 @@ pub(super) async fn run_memories_startup_pipeline( ) .await; } - return false; + return PhaseOneJobOutcome::Failed; } }; let Some(state_db) = session.services.state_db.as_deref() else { - return false; + return PhaseOneJobOutcome::Failed; }; if stage_one_output.raw_memory.is_empty() && stage_one_output.rollout_summary.is_empty() { - return state_db + return if state_db .mark_stage1_job_succeeded_no_output(thread.id, &claim.ownership_token) .await - .unwrap_or(false); + .unwrap_or(false) + { + PhaseOneJobOutcome::SucceededNoOutput + } else { + PhaseOneJobOutcome::Failed + }; } - state_db + if state_db .mark_stage1_job_succeeded( thread.id, &claim.ownership_token, @@ -172,19 +208,73 @@ pub(super) async fn run_memories_startup_pipeline( ) .await .unwrap_or(false) + { + PhaseOneJobOutcome::SucceededWithOutput + } else { + PhaseOneJobOutcome::Failed + } } }) .buffer_unordered(phase_one::CONCURRENCY_LIMIT) - .collect::>() - .await - .into_iter() - .filter(|ok| *ok) - .count(); + .collect::>() + .await; + } + + let succeeded_with_output_count = phase_one_outcomes + .iter() + .filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededWithOutput)) + .count(); + let succeeded_no_output_count = phase_one_outcomes + .iter() + .filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededNoOutput)) + .count(); + let failed_count = phase_one_outcomes + .iter() + .filter(|outcome| matches!(outcome, PhaseOneJobOutcome::Failed)) + .count(); + let succeeded_count = succeeded_with_output_count + succeeded_no_output_count; + + if claimed_count > 0 { + otel_manager.counter( + metrics::MEMORY_PHASE_ONE_JOBS, + claimed_count as i64, + &[("status", "claimed")], + ); + } + if succeeded_with_output_count > 0 { + otel_manager.counter( + metrics::MEMORY_PHASE_ONE_JOBS, + succeeded_with_output_count as i64, + &[("status", "succeeded")], + ); + otel_manager.counter( + metrics::MEMORY_PHASE_ONE_OUTPUT, + succeeded_with_output_count as i64, + &[], + ); + } + if succeeded_no_output_count > 0 { + otel_manager.counter( + metrics::MEMORY_PHASE_ONE_JOBS, + succeeded_no_output_count as i64, + &[("status", "succeeded_no_output")], + ); + } + if failed_count > 0 { + otel_manager.counter( + metrics::MEMORY_PHASE_ONE_JOBS, + failed_count as i64, + &[("status", "failed")], + ); } info!( - "memory stage-1 extraction complete: {} job(s) claimed, {} succeeded", - claimed_count, succeeded_count + "memory stage-1 extraction complete: {} job(s) claimed, {} succeeded ({} with output, {} no output), {} failed", + claimed_count, + succeeded_count, + succeeded_with_output_count, + succeeded_no_output_count, + failed_count ); let consolidation_job_count = diff --git a/codex-rs/core/src/memories/startup/phase2.rs b/codex-rs/core/src/memories/startup/phase2.rs index 2e839384b10..d79226cd168 100644 --- a/codex-rs/core/src/memories/startup/phase2.rs +++ b/codex-rs/core/src/memories/startup/phase2.rs @@ -1,6 +1,7 @@ use crate::agent::AgentStatus; use crate::agent::status::is_final as is_final_agent_status; use crate::codex::Session; +use crate::memories::metrics; use crate::memories::phase_two; use codex_protocol::ThreadId; use std::sync::Arc; @@ -18,6 +19,7 @@ pub(super) fn spawn_phase2_completion_task( ) { let state_db = session.services.state_db.clone(); let agent_control = session.services.agent_control.clone(); + let otel_manager = session.services.otel_manager.clone(); tokio::spawn(async move { let Some(state_db) = state_db else { @@ -30,6 +32,11 @@ pub(super) fn spawn_phase2_completion_task( warn!( "failed to subscribe to global memory consolidation agent {consolidation_agent_id}: {err}" ); + otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", "failed_subscribe_status")], + ); mark_phase2_failed_with_recovery( state_db.as_ref(), &ownership_token, @@ -49,8 +56,22 @@ pub(super) fn spawn_phase2_completion_task( ) .await; if matches!(final_status, AgentStatus::Shutdown | AgentStatus::NotFound) { + otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", "failed_agent_unavailable")], + ); return; } + if is_phase2_success(&final_status) { + otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", "succeeded")], + ); + } else { + otel_manager.counter(metrics::MEMORY_PHASE_TWO_JOBS, 1, &[("status", "failed")]); + } tokio::spawn(async move { if let Err(err) = agent_control.shutdown_agent(consolidation_agent_id).await {