Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions codex-rs/core/src/memories/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ mod phase_two {
pub(super) const JOB_HEARTBEAT_SECONDS: u64 = 30;
}

mod metrics {
/// Number of phase-1 startup jobs grouped by status.
pub(super) const MEMORY_PHASE_ONE_JOBS: &str = "codex.memory.phase1";
/// Number of raw memories produced by phase-1 startup extraction.
pub(super) const MEMORY_PHASE_ONE_OUTPUT: &str = "codex.memory.phase1.output";
/// Number of phase-2 startup jobs grouped by status.
pub(super) const MEMORY_PHASE_TWO_JOBS: &str = "codex.memory.phase2";
/// Number of stage-1 memories included in each phase-2 consolidation step.
pub(super) const MEMORY_PHASE_TWO_INPUT: &str = "codex.memory.phase2.input";
}

pub fn memory_root(codex_home: &Path) -> PathBuf {
codex_home.join("memories")
}
Expand Down
69 changes: 68 additions & 1 deletion codex-rs/core/src/memories/startup/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::codex::Session;
use crate::config::Config;
use crate::config::Constrained;
use crate::memories::memory_root;
use crate::memories::metrics;
use crate::memories::phase_two;
use crate::memories::prompts::build_consolidation_prompt;
use crate::memories::startup::phase2::spawn_phase2_completion_task;
Expand Down Expand Up @@ -34,8 +35,14 @@ pub(super) async fn run_global_memory_consolidation(
session: &Arc<Session>,
config: Arc<Config>,
) -> bool {
let otel_manager = &session.services.otel_manager;
let Some(state_db) = session.services.state_db.as_deref() else {
warn!("state db unavailable; skipping global memory consolidation");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "skipped_state_db_unavailable")],
);
return false;
};

Expand All @@ -46,20 +53,38 @@ pub(super) async fn run_global_memory_consolidation(
Ok(claim) => claim,
Err(err) => {
warn!("state db try_claim_global_phase2_job failed during memories startup: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_claim")],
);
return false;
}
};
let (ownership_token, claimed_watermark) = match claim {
codex_state::Phase2JobClaimOutcome::Claimed {
ownership_token,
input_watermark,
} => (ownership_token, input_watermark),
} => {
otel_manager.counter(metrics::MEMORY_PHASE_TWO_JOBS, 1, &[("status", "claimed")]);
(ownership_token, input_watermark)
}
codex_state::Phase2JobClaimOutcome::SkippedNotDirty => {
debug!("memory phase-2 global lock is up-to-date; skipping consolidation");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "skipped_not_dirty")],
);
return false;
}
codex_state::Phase2JobClaimOutcome::SkippedRunning => {
debug!("memory phase-2 global consolidation already running; skipping");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "skipped_running")],
);
return false;
}
};
Expand Down Expand Up @@ -89,6 +114,11 @@ pub(super) async fn run_global_memory_consolidation(
.set(consolidation_sandbox_policy)
{
warn!("memory phase-2 consolidation sandbox policy was rejected by constraints: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_sandbox_policy")],
);
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,
Expand All @@ -108,6 +138,11 @@ pub(super) async fn run_global_memory_consolidation(
Ok(memories) => memories,
Err(err) => {
warn!("state db list_stage1_outputs_for_global failed during consolidation: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_load_stage1_outputs")],
);
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,
Expand All @@ -118,9 +153,21 @@ pub(super) async fn run_global_memory_consolidation(
return false;
}
};
if !latest_memories.is_empty() {
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_INPUT,
latest_memories.len() as i64,
&[],
);
}
let completion_watermark = completion_watermark(claimed_watermark, &latest_memories);
if let Err(err) = sync_rollout_summaries_from_memories(&root, &latest_memories).await {
warn!("failed syncing local memory artifacts for global consolidation: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_sync_artifacts")],
);
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,
Expand All @@ -133,6 +180,11 @@ pub(super) async fn run_global_memory_consolidation(

if let Err(err) = rebuild_raw_memories_file_from_memories(&root, &latest_memories).await {
warn!("failed rebuilding raw memories aggregate for global consolidation: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_rebuild_raw_memories")],
);
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,
Expand All @@ -147,6 +199,11 @@ pub(super) async fn run_global_memory_consolidation(
let _ = state_db
.mark_global_phase2_job_succeeded(&ownership_token, completion_watermark)
.await;
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "succeeded_no_input")],
);
return false;
}

Expand All @@ -169,6 +226,11 @@ pub(super) async fn run_global_memory_consolidation(
info!(
"memory phase-2 global consolidation agent started: agent_id={consolidation_agent_id}"
);
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "agent_spawned")],
);
spawn_phase2_completion_task(
session.as_ref(),
ownership_token,
Expand All @@ -179,6 +241,11 @@ pub(super) async fn run_global_memory_consolidation(
}
Err(err) => {
warn!("failed to spawn global memory consolidation agent: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_spawn_agent")],
);
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,
Expand Down
118 changes: 104 additions & 14 deletions codex-rs/core/src/memories/startup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::codex::TurnContext;
use crate::config::Config;
use crate::error::Result as CodexResult;
use crate::features::Feature;
use crate::memories::metrics;
use crate::memories::phase_one;
use crate::rollout::INTERACTIVE_SESSION_SOURCES;
use codex_otel::OtelManager;
Expand All @@ -19,6 +20,13 @@ use std::sync::Arc;
use tracing::info;
use tracing::warn;

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum PhaseOneJobOutcome {
SucceededWithOutput,
SucceededNoOutput,
Failed,
}

pub(super) const PHASE_ONE_THREAD_SCAN_LIMIT: usize = 5_000;

#[derive(Clone)]
Expand Down Expand Up @@ -79,8 +87,19 @@ pub(super) async fn run_memories_startup_pipeline(
session: &Arc<Session>,
config: Arc<Config>,
) -> CodexResult<()> {
let otel_manager = &session.services.otel_manager;
let Some(state_db) = session.services.state_db.as_deref() else {
warn!("state db unavailable for memories startup pipeline; skipping");
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
1,
&[("status", "skipped_state_db_unavailable")],
);
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
1,
&[("status", "skipped_state_db_unavailable")],
);
Comment on lines +93 to +102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dupe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already have a follow-up where it's dropped. Dropping it therwe

return Ok(());
};

Expand All @@ -106,20 +125,32 @@ pub(super) async fn run_memories_startup_pipeline(
Ok(claims) => claims,
Err(err) => {
warn!("state db claim_stage1_jobs_for_startup failed during memories startup: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
1,
&[("status", "failed_claim")],
);
Vec::new()
}
};

let claimed_count = claimed_candidates.len();
let mut succeeded_count = 0;
if claimed_count == 0 {
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
1,
&[("status", "skipped_no_candidates")],
);
}
let mut phase_one_outcomes = Vec::new();
if claimed_count > 0 {
let turn_context = session.new_default_turn().await;
let stage_one_context = StageOneRequestContext::from_turn_context(
turn_context.as_ref(),
turn_context.resolve_turn_metadata_header().await,
);

succeeded_count = futures::stream::iter(claimed_candidates.into_iter())
phase_one_outcomes = futures::stream::iter(claimed_candidates.into_iter())
.map(|claim| {
let session = Arc::clone(session);
let stage_one_context = stage_one_context.clone();
Expand All @@ -145,24 +176,29 @@ pub(super) async fn run_memories_startup_pipeline(
)
.await;
}
return false;
return PhaseOneJobOutcome::Failed;
}
};

let Some(state_db) = session.services.state_db.as_deref() else {
return false;
return PhaseOneJobOutcome::Failed;
};

if stage_one_output.raw_memory.is_empty()
&& stage_one_output.rollout_summary.is_empty()
{
return state_db
return if state_db
.mark_stage1_job_succeeded_no_output(thread.id, &claim.ownership_token)
.await
.unwrap_or(false);
.unwrap_or(false)
{
PhaseOneJobOutcome::SucceededNoOutput
} else {
PhaseOneJobOutcome::Failed
};
}

state_db
if state_db
.mark_stage1_job_succeeded(
thread.id,
&claim.ownership_token,
Expand All @@ -172,19 +208,73 @@ pub(super) async fn run_memories_startup_pipeline(
)
.await
.unwrap_or(false)
{
PhaseOneJobOutcome::SucceededWithOutput
} else {
PhaseOneJobOutcome::Failed
}
}
})
.buffer_unordered(phase_one::CONCURRENCY_LIMIT)
.collect::<Vec<bool>>()
.await
.into_iter()
.filter(|ok| *ok)
.count();
.collect::<Vec<PhaseOneJobOutcome>>()
.await;
}

let succeeded_with_output_count = phase_one_outcomes
.iter()
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededWithOutput))
.count();
let succeeded_no_output_count = phase_one_outcomes
.iter()
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededNoOutput))
.count();
let failed_count = phase_one_outcomes
.iter()
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::Failed))
.count();
let succeeded_count = succeeded_with_output_count + succeeded_no_output_count;

if claimed_count > 0 {
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
claimed_count as i64,
&[("status", "claimed")],
);
}
if succeeded_with_output_count > 0 {
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
succeeded_with_output_count as i64,
&[("status", "succeeded")],
);
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_OUTPUT,
succeeded_with_output_count as i64,
&[],
);
}
if succeeded_no_output_count > 0 {
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
succeeded_no_output_count as i64,
&[("status", "succeeded_no_output")],
);
}
if failed_count > 0 {
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
failed_count as i64,
&[("status", "failed")],
);
}

info!(
"memory stage-1 extraction complete: {} job(s) claimed, {} succeeded",
claimed_count, succeeded_count
"memory stage-1 extraction complete: {} job(s) claimed, {} succeeded ({} with output, {} no output), {} failed",
claimed_count,
succeeded_count,
succeeded_with_output_count,
succeeded_no_output_count,
failed_count
);

let consolidation_job_count =
Expand Down
Loading
Loading