diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index e86d840b949..e1c982a99db 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -168,7 +168,7 @@ use codex_core::read_head_for_summary; use codex_core::read_session_meta_line; use codex_core::rollout_date_parts; use codex_core::sandboxing::SandboxPermissions; -use codex_core::state_db::{self}; +use codex_core::state_db::get_state_db; use codex_core::windows_sandbox::WindowsSandboxLevelExt; use codex_feedback::CodexFeedback; use codex_login::ServerOptions as LoginServerOptions; @@ -1702,7 +1702,7 @@ impl CodexMessageProcessor { let rollout_path_display = archived_path.display().to_string(); let fallback_provider = self.config.model_provider_id.clone(); - let state_db_ctx = state_db::init_if_enabled(&self.config, None).await; + let state_db_ctx = get_state_db(&self.config, None).await; let archived_folder = self .config .codex_home @@ -3571,7 +3571,7 @@ impl CodexMessageProcessor { } if state_db_ctx.is_none() { - state_db_ctx = state_db::init_if_enabled(&self.config, None).await; + state_db_ctx = get_state_db(&self.config, None).await; } // Move the rollout file to archived. diff --git a/codex-rs/core/src/rollout/metadata.rs b/codex-rs/core/src/rollout/metadata.rs index 32d13ebde23..d08e77e2417 100644 --- a/codex-rs/core/src/rollout/metadata.rs +++ b/codex-rs/core/src/rollout/metadata.rs @@ -15,12 +15,14 @@ use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; use codex_state::BackfillStats; use codex_state::DB_ERROR_METRIC; +use codex_state::DB_METRIC_BACKFILL; use codex_state::ExtractionOutcome; use codex_state::ThreadMetadataBuilder; use codex_state::apply_rollout_item; use std::cmp::Reverse; use std::path::Path; use std::path::PathBuf; +use tracing::info; use tracing::warn; const ROLLOUT_PREFIX: &str = "rollout-"; @@ -125,7 +127,7 @@ pub(crate) async fn backfill_sessions( runtime: &codex_state::StateRuntime, config: &Config, otel: Option<&OtelManager>, -) -> BackfillStats { +) { let sessions_root = config.codex_home.join(rollout::SESSIONS_SUBDIR); let archived_root = config.codex_home.join(rollout::ARCHIVED_SESSIONS_SUBDIR); let mut rollout_paths: Vec<(PathBuf, bool)> = Vec::new(); @@ -191,7 +193,23 @@ pub(crate) async fn backfill_sessions( } } } - stats + + info!( + "state db backfill scanned={}, upserted={}, failed={}", + stats.scanned, stats.upserted, stats.failed + ); + if let Some(otel) = otel { + otel.counter( + DB_METRIC_BACKFILL, + stats.upserted as i64, + &[("status", "upserted")], + ); + otel.counter( + DB_METRIC_BACKFILL, + stats.failed as i64, + &[("status", "failed")], + ); + } } async fn file_modified_time_utc(path: &Path) -> Option> { diff --git a/codex-rs/core/src/state_db.rs b/codex-rs/core/src/state_db.rs index 74c9ed7383c..7990a017ef0 100644 --- a/codex-rs/core/src/state_db.rs +++ b/codex-rs/core/src/state_db.rs @@ -11,7 +11,6 @@ use codex_otel::OtelManager; use codex_protocol::ThreadId; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; -use codex_state::DB_METRIC_BACKFILL; pub use codex_state::LogEntry; use codex_state::STATE_DB_FILENAME; use codex_state::ThreadMetadataBuilder; @@ -19,15 +18,18 @@ use serde_json::Value; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; -use tracing::info; use tracing::warn; use uuid::Uuid; /// Core-facing handle to the optional SQLite-backed state runtime. pub type StateDbHandle = Arc; -/// Initialize the state runtime when the `sqlite` feature flag is enabled. -pub async fn init_if_enabled(config: &Config, otel: Option<&OtelManager>) -> Option { +/// Initialize the state runtime when the `sqlite` feature flag is enabled. To only be used +/// inside `core`. The initialization should not be done anywhere else. +pub(crate) async fn init_if_enabled( + config: &Config, + otel: Option<&OtelManager>, +) -> Option { let state_path = config.codex_home.join(STATE_DB_FILENAME); if !config.features.enabled(Feature::Sqlite) { // We delete the file on best effort basis to maintain retro-compatibility in the future. @@ -59,27 +61,38 @@ pub async fn init_if_enabled(config: &Config, otel: Option<&OtelManager>) -> Opt } }; if !existed { - let stats = metadata::backfill_sessions(runtime.as_ref(), config, otel).await; - info!( - "state db backfill scanned={}, upserted={}, failed={}", - stats.scanned, stats.upserted, stats.failed - ); - if let Some(otel) = otel { - otel.counter( - DB_METRIC_BACKFILL, - stats.upserted as i64, - &[("status", "upserted")], - ); - otel.counter( - DB_METRIC_BACKFILL, - stats.failed as i64, - &[("status", "failed")], - ); - } + let runtime_for_backfill = Arc::clone(&runtime); + let config_for_backfill = config.clone(); + let otel_for_backfill = otel.cloned(); + tokio::task::spawn(async move { + metadata::backfill_sessions( + runtime_for_backfill.as_ref(), + &config_for_backfill, + otel_for_backfill.as_ref(), + ) + .await; + }); } Some(runtime) } +/// Get the DB if the feature is enabled and the DB exists. +pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option { + let state_path = config.codex_home.join(STATE_DB_FILENAME); + if !config.features.enabled(Feature::Sqlite) + || !tokio::fs::try_exists(&state_path).await.unwrap_or(false) + { + return None; + } + codex_state::StateRuntime::init( + config.codex_home.clone(), + config.model_provider_id.clone(), + otel.cloned(), + ) + .await + .ok() +} + /// Open the state runtime when the SQLite file exists, without feature gating. /// /// This is used for parity checks during the SQLite migration phase. diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index a48b8061294..c9e69b7125f 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -352,7 +352,7 @@ pub async fn run_main( let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer()); - let log_db_layer = codex_core::state_db::init_if_enabled(&config, None) + let log_db_layer = codex_core::state_db::get_state_db(&config, None) .await .map(|db| log_db::start(db).with_filter(env_filter()));