Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions codex-rs/core/src/rollout/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,71 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn list_threads_db_enabled_drops_missing_rollout_paths() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let mut config = ConfigBuilder::default()
.codex_home(home.path().to_path_buf())
.build()
.await?;
config.features.enable(Feature::Sqlite);

let uuid = Uuid::from_u128(9010);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let stale_path = home.path().join(format!(
"sessions/2099/01/01/rollout-2099-01-01T00-00-00-{uuid}.jsonl"
));

let runtime = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.model_provider_id.clone(),
None,
)
.await
.expect("state db should initialize");
runtime
.mark_backfill_complete(None)
.await
.expect("backfill should be complete");
let created_at = chrono::Utc
.with_ymd_and_hms(2025, 1, 3, 13, 0, 0)
.single()
.expect("valid datetime");
let mut builder = codex_state::ThreadMetadataBuilder::new(
thread_id,
stale_path,
created_at,
SessionSource::Cli,
);
builder.model_provider = Some(config.model_provider_id.clone());
builder.cwd = home.path().to_path_buf();
let mut metadata = builder.build(config.model_provider_id.as_str());
metadata.first_user_message = Some("Hello from user".to_string());
runtime
.upsert_thread(&metadata)
.await
.expect("state db upsert should succeed");

let default_provider = config.model_provider_id.clone();
let page = RolloutRecorder::list_threads(
&config,
10,
None,
ThreadSortKey::CreatedAt,
&[],
None,
default_provider.as_str(),
)
.await?;
assert_eq!(page.items.len(), 0);
let stored_path = runtime
.find_rollout_path_by_id(thread_id, Some(false))
.await
.expect("state db lookup should succeed");
assert_eq!(stored_path, None);
Ok(())
}

#[tokio::test]
async fn list_threads_db_enabled_repairs_stale_rollout_paths() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
Expand Down
22 changes: 21 additions & 1 deletion codex-rs/core/src/state_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,27 @@ pub async fn list_threads_db(
)
.await
{
Ok(page) => Some(page),
Ok(mut page) => {
let mut valid_items = Vec::with_capacity(page.items.len());
for item in page.items {
if tokio::fs::try_exists(&item.rollout_path)
.await
.unwrap_or(false)
{
valid_items.push(item);
} else {
warn!(
"state db list_threads returned stale rollout path for thread {}: {}",
item.id,
item.rollout_path.display()
);
record_discrepancy("list_threads_db", "stale_db_path_dropped");
let _ = ctx.delete_thread(item.id).await;
}
}
page.items = valid_items;
Some(page)
}
Err(err) => {
warn!("state db list_threads failed: {err}");
None
Expand Down
9 changes: 9 additions & 0 deletions codex-rs/state/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,15 @@ ON CONFLICT(thread_id, position) DO NOTHING
self.upsert_thread(&metadata).await
}

/// Delete a thread metadata row by id.
pub async fn delete_thread(&self, thread_id: ThreadId) -> anyhow::Result<u64> {
let result = sqlx::query("DELETE FROM threads WHERE id = ?")
.bind(thread_id.to_string())
.execute(self.pool.as_ref())
.await?;
Ok(result.rows_affected())
}

async fn ensure_backfill_state_row(&self) -> anyhow::Result<()> {
sqlx::query(
r#"
Expand Down
Loading