Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ use codex_core::read_head_for_summary;
use codex_core::read_session_meta_line;
use codex_core::rollout_date_parts;
use codex_core::sandboxing::SandboxPermissions;
use codex_core::state_db::{self};
use codex_core::state_db::get_state_db;
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
use codex_feedback::CodexFeedback;
use codex_login::ServerOptions as LoginServerOptions;
Expand Down Expand Up @@ -1702,7 +1702,7 @@ impl CodexMessageProcessor {

let rollout_path_display = archived_path.display().to_string();
let fallback_provider = self.config.model_provider_id.clone();
let state_db_ctx = state_db::init_if_enabled(&self.config, None).await;
let state_db_ctx = get_state_db(&self.config, None).await;
let archived_folder = self
.config
.codex_home
Expand Down Expand Up @@ -3571,7 +3571,7 @@ impl CodexMessageProcessor {
}

if state_db_ctx.is_none() {
state_db_ctx = state_db::init_if_enabled(&self.config, None).await;
state_db_ctx = get_state_db(&self.config, None).await;
}

// Move the rollout file to archived.
Expand Down
22 changes: 20 additions & 2 deletions codex-rs/core/src/rollout/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_state::BackfillStats;
use codex_state::DB_ERROR_METRIC;
use codex_state::DB_METRIC_BACKFILL;
use codex_state::ExtractionOutcome;
use codex_state::ThreadMetadataBuilder;
use codex_state::apply_rollout_item;
use std::cmp::Reverse;
use std::path::Path;
use std::path::PathBuf;
use tracing::info;
use tracing::warn;

const ROLLOUT_PREFIX: &str = "rollout-";
Expand Down Expand Up @@ -125,7 +127,7 @@ pub(crate) async fn backfill_sessions(
runtime: &codex_state::StateRuntime,
config: &Config,
otel: Option<&OtelManager>,
) -> BackfillStats {
) {
let sessions_root = config.codex_home.join(rollout::SESSIONS_SUBDIR);
let archived_root = config.codex_home.join(rollout::ARCHIVED_SESSIONS_SUBDIR);
let mut rollout_paths: Vec<(PathBuf, bool)> = Vec::new();
Expand Down Expand Up @@ -191,7 +193,23 @@ pub(crate) async fn backfill_sessions(
}
}
}
stats

info!(
"state db backfill scanned={}, upserted={}, failed={}",
stats.scanned, stats.upserted, stats.failed
);
if let Some(otel) = otel {
otel.counter(
DB_METRIC_BACKFILL,
stats.upserted as i64,
&[("status", "upserted")],
);
otel.counter(
DB_METRIC_BACKFILL,
stats.failed as i64,
&[("status", "failed")],
);
}
}

async fn file_modified_time_utc(path: &Path) -> Option<DateTime<Utc>> {
Expand Down
55 changes: 34 additions & 21 deletions codex-rs/core/src/state_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,25 @@ use codex_otel::OtelManager;
use codex_protocol::ThreadId;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use codex_state::DB_METRIC_BACKFILL;
pub use codex_state::LogEntry;
use codex_state::STATE_DB_FILENAME;
use codex_state::ThreadMetadataBuilder;
use serde_json::Value;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tracing::info;
use tracing::warn;
use uuid::Uuid;

/// Core-facing handle to the optional SQLite-backed state runtime.
pub type StateDbHandle = Arc<codex_state::StateRuntime>;

/// Initialize the state runtime when the `sqlite` feature flag is enabled.
pub async fn init_if_enabled(config: &Config, otel: Option<&OtelManager>) -> Option<StateDbHandle> {
/// Initialize the state runtime when the `sqlite` feature flag is enabled. To only be used
/// inside `core`. The initialization should not be done anywhere else.
pub(crate) async fn init_if_enabled(
config: &Config,
otel: Option<&OtelManager>,
) -> Option<StateDbHandle> {
let state_path = config.codex_home.join(STATE_DB_FILENAME);
if !config.features.enabled(Feature::Sqlite) {
// We delete the file on best effort basis to maintain retro-compatibility in the future.
Expand Down Expand Up @@ -59,27 +61,38 @@ pub async fn init_if_enabled(config: &Config, otel: Option<&OtelManager>) -> Opt
}
};
if !existed {
let stats = metadata::backfill_sessions(runtime.as_ref(), config, otel).await;
info!(
"state db backfill scanned={}, upserted={}, failed={}",
stats.scanned, stats.upserted, stats.failed
);
if let Some(otel) = otel {
otel.counter(
DB_METRIC_BACKFILL,
stats.upserted as i64,
&[("status", "upserted")],
);
otel.counter(
DB_METRIC_BACKFILL,
stats.failed as i64,
&[("status", "failed")],
);
}
let runtime_for_backfill = Arc::clone(&runtime);
let config_for_backfill = config.clone();
let otel_for_backfill = otel.cloned();
tokio::task::spawn(async move {
metadata::backfill_sessions(
runtime_for_backfill.as_ref(),
&config_for_backfill,
otel_for_backfill.as_ref(),
)
.await;
});
}
Some(runtime)
}

/// Get the DB if the feature is enabled and the DB exists.
pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option<StateDbHandle> {
let state_path = config.codex_home.join(STATE_DB_FILENAME);
if !config.features.enabled(Feature::Sqlite)
|| !tokio::fs::try_exists(&state_path).await.unwrap_or(false)
{
return None;
}
codex_state::StateRuntime::init(
config.codex_home.clone(),
config.model_provider_id.clone(),
otel.cloned(),
)
.await
.ok()
}

/// Open the state runtime when the SQLite file exists, without feature gating.
///
/// This is used for parity checks during the SQLite migration phase.
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/tui/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ pub async fn run_main(

let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());

let log_db_layer = codex_core::state_db::init_if_enabled(&config, None)
let log_db_layer = codex_core::state_db::get_state_db(&config, None)
.await
.map(|db| log_db::start(db).with_filter(env_filter()));

Expand Down
Loading