Skip to content
19 changes: 15 additions & 4 deletions codex-rs/core/src/rollout/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ struct HeadTailSummary {
/// Hard cap to bound worst‑case work per request.
const MAX_SCAN_FILES: usize = 10000;
const HEAD_RECORD_LIMIT: usize = 10;
const USER_EVENT_SCAN_LIMIT: usize = 200;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ThreadSortKey {
Expand Down Expand Up @@ -683,14 +684,20 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
let reader = tokio::io::BufReader::new(file);
let mut lines = reader.lines();
let mut summary = HeadTailSummary::default();
let mut lines_scanned = 0usize;

while summary.head.len() < head_limit {
while lines_scanned < head_limit
|| (summary.saw_session_meta
&& !summary.saw_user_event
&& lines_scanned < head_limit + USER_EVENT_SCAN_LIMIT)
{
let line_opt = lines.next_line().await?;
let Some(line) = line_opt else { break };
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
lines_scanned += 1;

let parsed: Result<RolloutLine, _> = serde_json::from_str(trimmed);
let Ok(rollout_line) = parsed else { continue };
Expand All @@ -703,17 +710,21 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
.created_at
.clone()
.or_else(|| Some(rollout_line.timestamp.clone()));
if let Ok(val) = serde_json::to_value(session_meta_line) {
summary.saw_session_meta = true;
if summary.head.len() < head_limit
&& let Ok(val) = serde_json::to_value(session_meta_line)
{
summary.head.push(val);
summary.saw_session_meta = true;
}
}
RolloutItem::ResponseItem(item) => {
summary.created_at = summary
.created_at
.clone()
.or_else(|| Some(rollout_line.timestamp.clone()));
if let Ok(val) = serde_json::to_value(item) {
if summary.head.len() < head_limit
&& let Ok(val) = serde_json::to_value(item)
{
summary.head.push(val);
}
}
Expand Down
82 changes: 82 additions & 0 deletions codex-rs/core/src/rollout/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,63 @@ fn write_session_file_with_provider(
Ok((dt, uuid))
}

fn write_session_file_with_delayed_user_event(
root: &Path,
ts_str: &str,
uuid: Uuid,
meta_lines_before_user: usize,
) -> std::io::Result<()> {
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let dt = PrimitiveDateTime::parse(ts_str, format)
.unwrap()
.assume_utc();
let dir = root
.join("sessions")
.join(format!("{:04}", dt.year()))
.join(format!("{:02}", u8::from(dt.month())))
.join(format!("{:02}", dt.day()));
fs::create_dir_all(&dir)?;

let filename = format!("rollout-{ts_str}-{uuid}.jsonl");
let file_path = dir.join(filename);
let mut file = File::create(file_path)?;

for i in 0..meta_lines_before_user {
let id = if i == 0 {
uuid
} else {
Uuid::from_u128(100 + i as u128)
};
let payload = serde_json::json!({
"id": id,
"timestamp": ts_str,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
});
let meta = serde_json::json!({
"timestamp": ts_str,
"type": "session_meta",
"payload": payload,
});
writeln!(file, "{meta}")?;
}

let user_event = serde_json::json!({
"timestamp": ts_str,
"type": "event_msg",
"payload": {"type": "user_message", "message": "Hello from user", "kind": "plain"}
});
writeln!(file, "{user_event}")?;

let times = FileTimes::new().set_modified(dt.into());
file.set_times(times)?;
Ok(())
}

fn write_session_file_with_meta_payload(
root: &Path,
ts_str: &str,
Expand Down Expand Up @@ -539,6 +596,31 @@ async fn test_pagination_cursor() {
assert_eq!(page3, expected_page3);
}

#[tokio::test]
async fn test_list_threads_scans_past_head_for_user_event() {
let temp = TempDir::new().unwrap();
let home = temp.path();

let uuid = Uuid::from_u128(99);
let ts = "2025-05-01T10-30-00";
write_session_file_with_delayed_user_event(home, ts, uuid, 12).unwrap();

let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page = get_threads(
home,
10,
None,
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
)
.await
.unwrap();

assert_eq!(page.items.len(), 1);
}

#[tokio::test]
async fn test_get_thread_contents() {
let temp = TempDir::new().unwrap();
Expand Down
Loading