diff --git a/codex-rs/core/src/state_db.rs b/codex-rs/core/src/state_db.rs index 7a0894a327c..1cfb8f15660 100644 --- a/codex-rs/core/src/state_db.rs +++ b/codex-rs/core/src/state_db.rs @@ -14,7 +14,6 @@ use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use codex_state::DB_METRIC_COMPARE_ERROR; pub use codex_state::LogEntry; -use codex_state::STATE_DB_FILENAME; use codex_state::ThreadMetadataBuilder; use serde_json::Value; use std::path::Path; @@ -32,7 +31,7 @@ pub(crate) async fn init_if_enabled( config: &Config, otel: Option<&OtelManager>, ) -> Option { - let state_path = config.codex_home.join(STATE_DB_FILENAME); + let state_path = codex_state::state_db_path(config.codex_home.as_path()); if !config.features.enabled(Feature::Sqlite) { return None; } @@ -74,7 +73,7 @@ pub(crate) async fn init_if_enabled( /// Get the DB if the feature is enabled and the DB exists. pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option { - let state_path = config.codex_home.join(STATE_DB_FILENAME); + let state_path = codex_state::state_db_path(config.codex_home.as_path()); if !config.features.enabled(Feature::Sqlite) || !tokio::fs::try_exists(&state_path).await.unwrap_or(false) { @@ -93,7 +92,7 @@ pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option /// /// This is used for parity checks during the SQLite migration phase. pub async fn open_if_present(codex_home: &Path, default_provider: &str) -> Option { - let db_path = codex_home.join(STATE_DB_FILENAME); + let db_path = codex_state::state_db_path(codex_home); if !tokio::fs::try_exists(&db_path).await.unwrap_or(false) { return None; } diff --git a/codex-rs/core/tests/suite/sqlite_state.rs b/codex-rs/core/tests/suite/sqlite_state.rs index 218da34825c..ac71c14d36a 100644 --- a/codex-rs/core/tests/suite/sqlite_state.rs +++ b/codex-rs/core/tests/suite/sqlite_state.rs @@ -9,7 +9,6 @@ use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::UserMessageEvent; -use codex_state::STATE_DB_FILENAME; use core_test_support::load_sse_fixture_with_id; use core_test_support::responses; use core_test_support::responses::ev_completed; @@ -39,7 +38,7 @@ async fn new_thread_is_recorded_in_state_db() -> Result<()> { let thread_id = test.session_configured.session_id; let rollout_path = test.codex.rollout_path().expect("rollout path"); - let db_path = test.config.codex_home.join(STATE_DB_FILENAME); + let db_path = codex_state::state_db_path(test.config.codex_home.as_path()); for _ in 0..100 { if tokio::fs::try_exists(&db_path).await.unwrap_or(false) { @@ -149,7 +148,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> { let test = builder.build(&server).await?; - let db_path = test.config.codex_home.join(STATE_DB_FILENAME); + let db_path = codex_state::state_db_path(test.config.codex_home.as_path()); let rollout_path = test.config.codex_home.join(&rollout_rel_path); let default_provider = test.config.model_provider_id.clone(); @@ -205,7 +204,7 @@ async fn user_messages_persist_in_state_db() -> Result<()> { }); let test = builder.build(&server).await?; - let db_path = test.config.codex_home.join(STATE_DB_FILENAME); + let db_path = codex_state::state_db_path(test.config.codex_home.as_path()); for _ in 0..100 { if tokio::fs::try_exists(&db_path).await.unwrap_or(false) { break; diff --git a/codex-rs/state/src/bin/logs_client.rs b/codex-rs/state/src/bin/logs_client.rs index d1329a1a898..796a968cb89 100644 --- a/codex-rs/state/src/bin/logs_client.rs +++ b/codex-rs/state/src/bin/logs_client.rs @@ -6,14 +6,13 @@ use chrono::DateTime; use clap::Parser; use codex_state::LogQuery; use codex_state::LogRow; -use codex_state::STATE_DB_FILENAME; use codex_state::StateRuntime; use dirs::home_dir; use owo_colors::OwoColorize; #[derive(Debug, Parser)] #[command(name = "codex-state-logs")] -#[command(about = "Tail Codex logs from state.sqlite with simple filters")] +#[command(about = "Tail Codex logs from the state SQLite DB with simple filters")] struct Args { /// Path to CODEX_HOME. Defaults to $CODEX_HOME or ~/.codex. #[arg(long, env = "CODEX_HOME")] @@ -104,7 +103,7 @@ fn resolve_db_path(args: &Args) -> anyhow::Result { } let codex_home = args.codex_home.clone().unwrap_or_else(default_codex_home); - Ok(codex_home.join(STATE_DB_FILENAME)) + Ok(codex_state::state_db_path(codex_home.as_path())) } fn default_codex_home() -> PathBuf { diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index c08c76a1cf3..2d37ecee966 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -29,6 +29,9 @@ pub use model::ThreadMetadata; pub use model::ThreadMetadataBuilder; pub use model::ThreadsPage; pub use runtime::STATE_DB_FILENAME; +pub use runtime::STATE_DB_VERSION; +pub use runtime::state_db_filename; +pub use runtime::state_db_path; /// Errors encountered during DB operations. Tags: [stage] pub const DB_ERROR_METRIC: &str = "codex.db.error"; diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index c64b7d56c6c..f4c2d76e8a4 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -35,7 +35,8 @@ use std::sync::Arc; use std::time::Duration; use tracing::warn; -pub const STATE_DB_FILENAME: &str = "state.sqlite"; +pub const STATE_DB_FILENAME: &str = "state"; +pub const STATE_DB_VERSION: u32 = 2; const METRIC_DB_INIT: &str = "codex.db.init"; @@ -56,7 +57,8 @@ impl StateRuntime { otel: Option, ) -> anyhow::Result> { tokio::fs::create_dir_all(&codex_home).await?; - let state_path = codex_home.join(STATE_DB_FILENAME); + remove_legacy_state_files(&codex_home).await; + let state_path = state_db_path(codex_home.as_path()); let existed = tokio::fs::try_exists(&state_path).await.unwrap_or(false); let pool = match open_sqlite(&state_path).await { Ok(db) => Arc::new(db), @@ -624,6 +626,77 @@ async fn open_sqlite(path: &Path) -> anyhow::Result { Ok(pool) } +pub fn state_db_filename() -> String { + format!("{STATE_DB_FILENAME}_{STATE_DB_VERSION}.sqlite") +} + +pub fn state_db_path(codex_home: &Path) -> PathBuf { + codex_home.join(state_db_filename()) +} + +async fn remove_legacy_state_files(codex_home: &Path) { + let current_name = state_db_filename(); + let mut entries = match tokio::fs::read_dir(codex_home).await { + Ok(entries) => entries, + Err(err) => { + warn!( + "failed to read codex_home for state db cleanup {}: {err}", + codex_home.display() + ); + return; + } + }; + while let Ok(Some(entry)) = entries.next_entry().await { + if !entry + .file_type() + .await + .map(|file_type| file_type.is_file()) + .unwrap_or(false) + { + continue; + } + let file_name = entry.file_name(); + let file_name = file_name.to_string_lossy(); + if !should_remove_state_file(file_name.as_ref(), current_name.as_str()) { + continue; + } + + let legacy_path = entry.path(); + if let Err(err) = tokio::fs::remove_file(&legacy_path).await { + warn!( + "failed to remove legacy state db file {}: {err}", + legacy_path.display() + ); + } + } +} + +fn should_remove_state_file(file_name: &str, current_name: &str) -> bool { + let mut base_name = file_name; + for suffix in ["-wal", "-shm", "-journal"] { + if let Some(stripped) = file_name.strip_suffix(suffix) { + base_name = stripped; + break; + } + } + if base_name == current_name { + return false; + } + let unversioned_name = format!("{STATE_DB_FILENAME}.sqlite"); + if base_name == unversioned_name { + return true; + } + + let Some(version_with_extension) = base_name.strip_prefix(&format!("{STATE_DB_FILENAME}_")) + else { + return false; + }; + let Some(version_suffix) = version_with_extension.strip_suffix(".sqlite") else { + return false; + }; + !version_suffix.is_empty() && version_suffix.chars().all(|ch| ch.is_ascii_digit()) +} + fn push_thread_filters<'a>( builder: &mut QueryBuilder<'a, Sqlite>, archived_only: bool, @@ -692,3 +765,97 @@ fn push_thread_order_and_limit( builder.push(" LIMIT "); builder.push_bind(limit as i64); } + +#[cfg(test)] +mod tests { + use super::STATE_DB_FILENAME; + use super::STATE_DB_VERSION; + use super::StateRuntime; + use super::state_db_filename; + use pretty_assertions::assert_eq; + use std::path::PathBuf; + use std::time::SystemTime; + use std::time::UNIX_EPOCH; + + fn unique_temp_dir() -> PathBuf { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_or(0, |duration| duration.as_nanos()); + std::env::temp_dir().join(format!("codex-state-runtime-test-{nanos}")) + } + + #[tokio::test] + async fn init_removes_legacy_state_db_files() { + let codex_home = unique_temp_dir(); + tokio::fs::create_dir_all(&codex_home) + .await + .expect("create codex_home"); + + let current_name = state_db_filename(); + let previous_version = STATE_DB_VERSION.saturating_sub(1); + let unversioned_name = format!("{STATE_DB_FILENAME}.sqlite"); + for suffix in ["", "-wal", "-shm", "-journal"] { + let path = codex_home.join(format!("{unversioned_name}{suffix}")); + tokio::fs::write(path, b"legacy") + .await + .expect("write legacy"); + let old_version_path = codex_home.join(format!( + "{STATE_DB_FILENAME}_{previous_version}.sqlite{suffix}" + )); + tokio::fs::write(old_version_path, b"old_version") + .await + .expect("write old version"); + } + let unrelated_path = codex_home.join("state.sqlite_backup"); + tokio::fs::write(&unrelated_path, b"keep") + .await + .expect("write unrelated"); + let numeric_path = codex_home.join("123"); + tokio::fs::write(&numeric_path, b"keep") + .await + .expect("write numeric"); + + let _runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + for suffix in ["", "-wal", "-shm", "-journal"] { + let legacy_path = codex_home.join(format!("{unversioned_name}{suffix}")); + assert_eq!( + tokio::fs::try_exists(&legacy_path) + .await + .expect("check legacy path"), + false + ); + let old_version_path = codex_home.join(format!( + "{STATE_DB_FILENAME}_{previous_version}.sqlite{suffix}" + )); + assert_eq!( + tokio::fs::try_exists(&old_version_path) + .await + .expect("check old version path"), + false + ); + } + assert_eq!( + tokio::fs::try_exists(codex_home.join(current_name)) + .await + .expect("check new db path"), + true + ); + assert_eq!( + tokio::fs::try_exists(&unrelated_path) + .await + .expect("check unrelated path"), + true + ); + assert_eq!( + tokio::fs::try_exists(&numeric_path) + .await + .expect("check numeric path"), + true + ); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } +}