Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion codex-rs/core/src/rollout/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,14 @@ impl<'de> serde::Deserialize<'de> for Cursor {
}
}

impl From<codex_state::Anchor> for Cursor {
fn from(anchor: codex_state::Anchor) -> Self {
let ts = OffsetDateTime::from_unix_timestamp(anchor.ts.timestamp())
.unwrap_or(OffsetDateTime::UNIX_EPOCH);
Self::new(ts, anchor.id)
}
}

/// Retrieve recorded thread file paths with token pagination. The returned `next_cursor`
/// can be supplied on the next call to resume after the last returned item, resilient to
/// concurrent new sessions being appended. Ordering is stable by the requested sort key
Expand Down Expand Up @@ -989,7 +997,6 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
&& !UserInstructions::is_user_instructions(content.as_slice())
&& !is_session_prefix_content(content.as_slice())
{
tracing::warn!("Item: {item:#?}");
summary.saw_user_event = true;
}
if summary.head.len() < head_limit
Expand Down
142 changes: 90 additions & 52 deletions codex-rs/core/src/rollout/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::io::Error as IoError;
use std::path::Path;
use std::path::PathBuf;

use chrono::SecondsFormat;
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::models::BaseInstructions;
Expand All @@ -23,12 +24,14 @@ use tracing::warn;
use super::ARCHIVED_SESSIONS_SUBDIR;
use super::SESSIONS_SUBDIR;
use super::list::Cursor;
use super::list::ThreadItem;
use super::list::ThreadListConfig;
use super::list::ThreadListLayout;
use super::list::ThreadSortKey;
use super::list::ThreadsPage;
use super::list::get_threads;
use super::list::get_threads_in_root;
use super::list::read_head_for_summary;
use super::metadata;
use super::policy::is_persisted_response_item;
use crate::config::Config;
Expand Down Expand Up @@ -120,98 +123,98 @@ impl RolloutRecorder {
model_providers: Option<&[String]>,
default_provider: &str,
) -> std::io::Result<ThreadsPage> {
let stage = "list_threads";
let page = get_threads(
Self::list_threads_with_db_fallback(
codex_home,
page_size,
cursor,
sort_key,
allowed_sources,
model_providers,
default_provider,
false,
)
.await?;
.await
}

// TODO(jif): drop after sqlite migration phase 1
let state_db_ctx = state_db::open_if_present(codex_home, default_provider).await;
if let Some(db_ids) = state_db::list_thread_ids_db(
state_db_ctx.as_deref(),
/// List archived threads (rollout files) under the archived sessions directory.
pub async fn list_archived_threads(
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
default_provider: &str,
) -> std::io::Result<ThreadsPage> {
Self::list_threads_with_db_fallback(
codex_home,
page_size,
cursor,
sort_key,
allowed_sources,
model_providers,
false,
stage,
default_provider,
true,
)
.await
{
if page.items.len() != db_ids.len() {
state_db::record_discrepancy(stage, "bad_len");
return Ok(page);
}
for (id, item) in db_ids.iter().zip(page.items.iter()) {
if !item.path.display().to_string().contains(&id.to_string()) {
state_db::record_discrepancy(stage, "bad_id");
}
}
}
Ok(page)
}

/// List archived threads (rollout files) under the archived sessions directory.
pub async fn list_archived_threads(
#[allow(clippy::too_many_arguments)]
async fn list_threads_with_db_fallback(
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
default_provider: &str,
archived: bool,
) -> std::io::Result<ThreadsPage> {
let stage = "list_archived_threads";
let root = codex_home.join(ARCHIVED_SESSIONS_SUBDIR);
let page = get_threads_in_root(
root,
let state_db_ctx = state_db::open_if_present(codex_home, default_provider).await;
if let Some(db_page) = state_db::list_threads_db(
state_db_ctx.as_deref(),
codex_home,
page_size,
cursor,
sort_key,
ThreadListConfig {
allowed_sources,
model_providers,
default_provider,
layout: ThreadListLayout::Flat,
},
allowed_sources,
model_providers,
archived,
)
.await?;
.await
{
let mut page: ThreadsPage = db_page.into();
populate_thread_heads(page.items.as_mut_slice()).await;
return Ok(page);
}

// TODO(jif): drop after sqlite migration phase 1
let state_db_ctx = state_db::open_if_present(codex_home, default_provider).await;
if let Some(db_ids) = state_db::list_thread_ids_db(
state_db_ctx.as_deref(),
if archived {
let root = codex_home.join(ARCHIVED_SESSIONS_SUBDIR);
return get_threads_in_root(
root,
page_size,
cursor,
sort_key,
ThreadListConfig {
allowed_sources,
model_providers,
default_provider,
layout: ThreadListLayout::Flat,
},
)
.await;
}

get_threads(
codex_home,
page_size,
cursor,
sort_key,
allowed_sources,
model_providers,
true,
stage,
default_provider,
)
.await
{
if page.items.len() != db_ids.len() {
state_db::record_discrepancy(stage, "bad_len");
return Ok(page);
}
for (id, item) in db_ids.iter().zip(page.items.iter()) {
if !item.path.display().to_string().contains(&id.to_string()) {
state_db::record_discrepancy(stage, "bad_id");
}
}
}
Ok(page)
}

/// Find the newest recorded thread path, optionally filtering to a matching cwd.
Expand Down Expand Up @@ -645,6 +648,41 @@ impl JsonlWriter {
}
}

impl From<codex_state::ThreadsPage> for ThreadsPage {
fn from(db_page: codex_state::ThreadsPage) -> Self {
let items = db_page
.items
.into_iter()
.map(|item| ThreadItem {
path: item.rollout_path,
head: Vec::new(),
created_at: Some(item.created_at.to_rfc3339_opts(SecondsFormat::Secs, true)),
updated_at: Some(item.updated_at.to_rfc3339_opts(SecondsFormat::Secs, true)),
})
.collect();
Self {
items,
next_cursor: db_page.next_anchor.map(Into::into),
num_scanned_files: db_page.num_scanned_rows,
reached_scan_cap: false,
}
}
}

async fn populate_thread_heads(items: &mut [ThreadItem]) {
for item in items {
item.head = read_head_for_summary(item.path.as_path())
.await
.unwrap_or_else(|err| {
warn!(
"failed to read rollout head from state db path: {} ({err})",
item.path.display()
);
Vec::new()
});
}
}

fn select_resume_path(page: &ThreadsPage, filter_cwd: Option<&Path>) -> Option<PathBuf> {
match filter_cwd {
Some(cwd) => page.items.iter().find_map(|item| {
Expand Down
Loading
Loading