Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions codex-rs/core/src/state_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use codex_state::DB_METRIC_COMPARE_ERROR;
pub use codex_state::LogEntry;
use codex_state::STATE_DB_FILENAME;
use codex_state::ThreadMetadataBuilder;
use serde_json::Value;
use std::path::Path;
Expand All @@ -32,7 +31,7 @@ pub(crate) async fn init_if_enabled(
config: &Config,
otel: Option<&OtelManager>,
) -> Option<StateDbHandle> {
let state_path = config.codex_home.join(STATE_DB_FILENAME);
let state_path = codex_state::state_db_path(config.codex_home.as_path());
if !config.features.enabled(Feature::Sqlite) {
return None;
}
Expand Down Expand Up @@ -74,7 +73,7 @@ pub(crate) async fn init_if_enabled(

/// Get the DB if the feature is enabled and the DB exists.
pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option<StateDbHandle> {
let state_path = config.codex_home.join(STATE_DB_FILENAME);
let state_path = codex_state::state_db_path(config.codex_home.as_path());
if !config.features.enabled(Feature::Sqlite)
|| !tokio::fs::try_exists(&state_path).await.unwrap_or(false)
{
Expand All @@ -93,7 +92,7 @@ pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option
///
/// This is used for parity checks during the SQLite migration phase.
pub async fn open_if_present(codex_home: &Path, default_provider: &str) -> Option<StateDbHandle> {
let db_path = codex_home.join(STATE_DB_FILENAME);
let db_path = codex_state::state_db_path(codex_home);
if !tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
return None;
}
Expand Down
7 changes: 3 additions & 4 deletions codex-rs/core/tests/suite/sqlite_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::UserMessageEvent;
use codex_state::STATE_DB_FILENAME;
use core_test_support::load_sse_fixture_with_id;
use core_test_support::responses;
use core_test_support::responses::ev_completed;
Expand Down Expand Up @@ -39,7 +38,7 @@ async fn new_thread_is_recorded_in_state_db() -> Result<()> {

let thread_id = test.session_configured.session_id;
let rollout_path = test.codex.rollout_path().expect("rollout path");
let db_path = test.config.codex_home.join(STATE_DB_FILENAME);
let db_path = codex_state::state_db_path(test.config.codex_home.as_path());

for _ in 0..100 {
if tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
Expand Down Expand Up @@ -149,7 +148,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> {

let test = builder.build(&server).await?;

let db_path = test.config.codex_home.join(STATE_DB_FILENAME);
let db_path = codex_state::state_db_path(test.config.codex_home.as_path());
let rollout_path = test.config.codex_home.join(&rollout_rel_path);
let default_provider = test.config.model_provider_id.clone();

Expand Down Expand Up @@ -205,7 +204,7 @@ async fn user_messages_persist_in_state_db() -> Result<()> {
});
let test = builder.build(&server).await?;

let db_path = test.config.codex_home.join(STATE_DB_FILENAME);
let db_path = codex_state::state_db_path(test.config.codex_home.as_path());
for _ in 0..100 {
if tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
break;
Expand Down
5 changes: 2 additions & 3 deletions codex-rs/state/src/bin/logs_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ use chrono::DateTime;
use clap::Parser;
use codex_state::LogQuery;
use codex_state::LogRow;
use codex_state::STATE_DB_FILENAME;
use codex_state::StateRuntime;
use dirs::home_dir;
use owo_colors::OwoColorize;

#[derive(Debug, Parser)]
#[command(name = "codex-state-logs")]
#[command(about = "Tail Codex logs from state.sqlite with simple filters")]
#[command(about = "Tail Codex logs from the state SQLite DB with simple filters")]
struct Args {
/// Path to CODEX_HOME. Defaults to $CODEX_HOME or ~/.codex.
#[arg(long, env = "CODEX_HOME")]
Expand Down Expand Up @@ -104,7 +103,7 @@ fn resolve_db_path(args: &Args) -> anyhow::Result<PathBuf> {
}

let codex_home = args.codex_home.clone().unwrap_or_else(default_codex_home);
Ok(codex_home.join(STATE_DB_FILENAME))
Ok(codex_state::state_db_path(codex_home.as_path()))
}

fn default_codex_home() -> PathBuf {
Expand Down
3 changes: 3 additions & 0 deletions codex-rs/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ pub use model::ThreadMetadata;
pub use model::ThreadMetadataBuilder;
pub use model::ThreadsPage;
pub use runtime::STATE_DB_FILENAME;
pub use runtime::STATE_DB_VERSION;
pub use runtime::state_db_filename;
pub use runtime::state_db_path;

/// Errors encountered during DB operations. Tags: [stage]
pub const DB_ERROR_METRIC: &str = "codex.db.error";
Expand Down
171 changes: 169 additions & 2 deletions codex-rs/state/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use std::sync::Arc;
use std::time::Duration;
use tracing::warn;

pub const STATE_DB_FILENAME: &str = "state.sqlite";
pub const STATE_DB_FILENAME: &str = "state";
pub const STATE_DB_VERSION: u32 = 2;

const METRIC_DB_INIT: &str = "codex.db.init";

Expand All @@ -56,7 +57,8 @@ impl StateRuntime {
otel: Option<OtelManager>,
) -> anyhow::Result<Arc<Self>> {
tokio::fs::create_dir_all(&codex_home).await?;
let state_path = codex_home.join(STATE_DB_FILENAME);
remove_legacy_state_files(&codex_home).await;
let state_path = state_db_path(codex_home.as_path());
let existed = tokio::fs::try_exists(&state_path).await.unwrap_or(false);
let pool = match open_sqlite(&state_path).await {
Ok(db) => Arc::new(db),
Expand Down Expand Up @@ -624,6 +626,77 @@ async fn open_sqlite(path: &Path) -> anyhow::Result<SqlitePool> {
Ok(pool)
}

pub fn state_db_filename() -> String {
format!("{STATE_DB_FILENAME}_{STATE_DB_VERSION}.sqlite")
}

pub fn state_db_path(codex_home: &Path) -> PathBuf {
codex_home.join(state_db_filename())
}

async fn remove_legacy_state_files(codex_home: &Path) {
let current_name = state_db_filename();
let mut entries = match tokio::fs::read_dir(codex_home).await {
Ok(entries) => entries,
Err(err) => {
warn!(
"failed to read codex_home for state db cleanup {}: {err}",
codex_home.display()
);
return;
}
};
while let Ok(Some(entry)) = entries.next_entry().await {
if !entry
.file_type()
.await
.map(|file_type| file_type.is_file())
.unwrap_or(false)
{
continue;
}
let file_name = entry.file_name();
let file_name = file_name.to_string_lossy();
if !should_remove_state_file(file_name.as_ref(), current_name.as_str()) {
continue;
}

let legacy_path = entry.path();
if let Err(err) = tokio::fs::remove_file(&legacy_path).await {
warn!(
"failed to remove legacy state db file {}: {err}",
legacy_path.display()
);
}
}
}

fn should_remove_state_file(file_name: &str, current_name: &str) -> bool {
let mut base_name = file_name;
for suffix in ["-wal", "-shm", "-journal"] {
if let Some(stripped) = file_name.strip_suffix(suffix) {
base_name = stripped;
break;
}
}
if base_name == current_name {
return false;
}
let unversioned_name = format!("{STATE_DB_FILENAME}.sqlite");
if base_name == unversioned_name {
return true;
}

let Some(version_with_extension) = base_name.strip_prefix(&format!("{STATE_DB_FILENAME}_"))
else {
return false;
};
let Some(version_suffix) = version_with_extension.strip_suffix(".sqlite") else {
return false;
};
!version_suffix.is_empty() && version_suffix.chars().all(|ch| ch.is_ascii_digit())
}

fn push_thread_filters<'a>(
builder: &mut QueryBuilder<'a, Sqlite>,
archived_only: bool,
Expand Down Expand Up @@ -692,3 +765,97 @@ fn push_thread_order_and_limit(
builder.push(" LIMIT ");
builder.push_bind(limit as i64);
}

#[cfg(test)]
mod tests {
use super::STATE_DB_FILENAME;
use super::STATE_DB_VERSION;
use super::StateRuntime;
use super::state_db_filename;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

fn unique_temp_dir() -> PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |duration| duration.as_nanos());
std::env::temp_dir().join(format!("codex-state-runtime-test-{nanos}"))
}

#[tokio::test]
async fn init_removes_legacy_state_db_files() {
let codex_home = unique_temp_dir();
tokio::fs::create_dir_all(&codex_home)
.await
.expect("create codex_home");

let current_name = state_db_filename();
let previous_version = STATE_DB_VERSION.saturating_sub(1);
let unversioned_name = format!("{STATE_DB_FILENAME}.sqlite");
for suffix in ["", "-wal", "-shm", "-journal"] {
let path = codex_home.join(format!("{unversioned_name}{suffix}"));
tokio::fs::write(path, b"legacy")
.await
.expect("write legacy");
let old_version_path = codex_home.join(format!(
"{STATE_DB_FILENAME}_{previous_version}.sqlite{suffix}"
));
tokio::fs::write(old_version_path, b"old_version")
.await
.expect("write old version");
}
let unrelated_path = codex_home.join("state.sqlite_backup");
tokio::fs::write(&unrelated_path, b"keep")
.await
.expect("write unrelated");
let numeric_path = codex_home.join("123");
tokio::fs::write(&numeric_path, b"keep")
.await
.expect("write numeric");

let _runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");

for suffix in ["", "-wal", "-shm", "-journal"] {
let legacy_path = codex_home.join(format!("{unversioned_name}{suffix}"));
assert_eq!(
tokio::fs::try_exists(&legacy_path)
.await
.expect("check legacy path"),
false
);
let old_version_path = codex_home.join(format!(
"{STATE_DB_FILENAME}_{previous_version}.sqlite{suffix}"
));
assert_eq!(
tokio::fs::try_exists(&old_version_path)
.await
.expect("check old version path"),
false
);
}
assert_eq!(
tokio::fs::try_exists(codex_home.join(current_name))
.await
.expect("check new db path"),
true
);
assert_eq!(
tokio::fs::try_exists(&unrelated_path)
.await
.expect("check unrelated path"),
true
);
assert_eq!(
tokio::fs::try_exists(&numeric_path)
.await
.expect("check numeric path"),
true
);

let _ = tokio::fs::remove_dir_all(codex_home).await;
}
}
Loading