Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ client_request_definitions! {
params: v2::ThreadArchiveParams,
response: v2::ThreadArchiveResponse,
},
ThreadUnarchive => "thread/unarchive" {
params: v2::ThreadUnarchiveParams,
response: v2::ThreadUnarchiveResponse,
},
ThreadRollback => "thread/rollback" {
params: v2::ThreadRollbackParams,
response: v2::ThreadRollbackResponse,
Expand Down
14 changes: 14 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,20 @@ pub struct ThreadArchiveParams {
#[ts(export_to = "v2/")]
pub struct ThreadArchiveResponse {}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadUnarchiveParams {
pub thread_id: String,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadUnarchiveResponse {
pub thread: Thread,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
Expand Down
10 changes: 10 additions & 0 deletions codex-rs/app-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Example (from OpenAI's official VSCode extension):
- `thread/loaded/list` — list the thread ids currently loaded in memory.
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`.
- `thread/archive` — move a thread’s rollout file into the archived directory; returns `{}` on success.
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success.
- `thread/rollback` — drop the last N turns from the agent’s in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications.
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
Expand Down Expand Up @@ -210,6 +211,15 @@ Use `thread/archive` to move the persisted rollout (stored as a JSONL file on di

An archived thread will not appear in `thread/list` unless `archived` is set to `true`.

### Example: Unarchive a thread

Use `thread/unarchive` to move an archived rollout back into the sessions directory.

```json
{ "method": "thread/unarchive", "id": 24, "params": { "threadId": "thr_b" } }
{ "id": 24, "result": { "thread": { "id": "thr_b" } } }
```

### Example: Start a turn (send user input)

Turns attach user input (text or images) to a thread and trigger Codex generation. The `input` field is a list of discriminated unions:
Expand Down
151 changes: 151 additions & 0 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ use codex_app_server_protocol::ThreadSortKey;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::ThreadUnarchiveResponse;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnError;
use codex_app_server_protocol::TurnInterruptParams;
Expand Down Expand Up @@ -150,6 +152,7 @@ use codex_core::error::CodexErr;
use codex_core::exec::ExecParams;
use codex_core::exec_env::create_env;
use codex_core::features::Feature;
use codex_core::find_archived_thread_path_by_id_str;
use codex_core::find_thread_path_by_id_str;
use codex_core::git_info::git_diff_to_remote;
use codex_core::mcp::collect_mcp_snapshot;
Expand All @@ -163,6 +166,7 @@ use codex_core::protocol::ReviewTarget as CoreReviewTarget;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::read_head_for_summary;
use codex_core::read_session_meta_line;
use codex_core::rollout_date_parts;
use codex_core::sandboxing::SandboxPermissions;
use codex_feedback::CodexFeedback;
use codex_login::ServerOptions as LoginServerOptions;
Expand Down Expand Up @@ -402,6 +406,9 @@ impl CodexMessageProcessor {
ClientRequest::ThreadArchive { request_id, params } => {
self.thread_archive(request_id, params).await;
}
ClientRequest::ThreadUnarchive { request_id, params } => {
self.thread_unarchive(request_id, params).await;
}
ClientRequest::ThreadRollback { request_id, params } => {
self.thread_rollback(request_id, params).await;
}
Expand Down Expand Up @@ -1595,6 +1602,150 @@ impl CodexMessageProcessor {
}
}

async fn thread_unarchive(&mut self, request_id: RequestId, params: ThreadUnarchiveParams) {
let thread_id = match ThreadId::from_string(&params.thread_id) {
Ok(id) => id,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid thread id: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};

let archived_path = match find_archived_thread_path_by_id_str(
&self.config.codex_home,
&thread_id.to_string(),
)
.await
{
Ok(Some(path)) => path,
Ok(None) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("no archived rollout found for thread id {thread_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("failed to locate archived thread id {thread_id}: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};

let rollout_path_display = archived_path.display().to_string();
let fallback_provider = self.config.model_provider_id.clone();
let archived_folder = self
.config
.codex_home
.join(codex_core::ARCHIVED_SESSIONS_SUBDIR);

let result: Result<Thread, JSONRPCErrorError> = async {
let canonical_archived_dir = tokio::fs::canonicalize(&archived_folder).await.map_err(
|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"failed to unarchive thread: unable to resolve archived directory: {err}"
),
data: None,
},
)?;
let canonical_rollout_path = tokio::fs::canonicalize(&archived_path).await;
let canonical_rollout_path = if let Ok(path) = canonical_rollout_path
&& path.starts_with(&canonical_archived_dir)
{
path
} else {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"rollout path `{rollout_path_display}` must be in archived directory"
),
data: None,
});
};

let required_suffix = format!("{thread_id}.jsonl");
let Some(file_name) = canonical_rollout_path.file_name().map(OsStr::to_owned) else {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("rollout path `{rollout_path_display}` missing file name"),
data: None,
});
};
if !file_name
.to_string_lossy()
.ends_with(required_suffix.as_str())
{
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"rollout path `{rollout_path_display}` does not match thread id {thread_id}"
),
data: None,
});
}

let Some((year, month, day)) = rollout_date_parts(&file_name) else {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"rollout path `{rollout_path_display}` missing filename timestamp"
),
data: None,
});
};

let sessions_folder = self.config.codex_home.join(codex_core::SESSIONS_SUBDIR);
let dest_dir = sessions_folder.join(year).join(month).join(day);
let restored_path = dest_dir.join(&file_name);
tokio::fs::create_dir_all(&dest_dir)
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to unarchive thread: {err}"),
data: None,
})?;
tokio::fs::rename(&canonical_rollout_path, &restored_path)
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to unarchive thread: {err}"),
data: None,
})?;
let summary =
read_summary_from_rollout(restored_path.as_path(), fallback_provider.as_str())
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to read unarchived thread: {err}"),
data: None,
})?;
Ok(summary_to_thread(summary))
}
.await;

match result {
Ok(thread) => {
let response = ThreadUnarchiveResponse { thread };
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
self.outgoing.send_error(request_id, err).await;
}
}
}

async fn thread_rollback(&mut self, request_id: RequestId, params: ThreadRollbackParams) {
let ThreadRollbackParams {
thread_id,
Expand Down
10 changes: 10 additions & 0 deletions codex-rs/app-server/tests/common/mcp_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadRollbackParams;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::TurnInterruptParams;
use codex_app_server_protocol::TurnStartParams;
use codex_core::default_client::CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR;
Expand Down Expand Up @@ -365,6 +366,15 @@ impl McpProcess {
self.send_request("thread/archive", params).await
}

/// Send a `thread/unarchive` JSON-RPC request.
pub async fn send_thread_unarchive_request(
&mut self,
params: ThreadUnarchiveParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/unarchive", params).await
}

/// Send a `thread/rollback` JSON-RPC request.
pub async fn send_thread_rollback_request(
&mut self,
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/tests/suite/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ mod thread_read;
mod thread_resume;
mod thread_rollback;
mod thread_start;
mod thread_unarchive;
mod turn_interrupt;
mod turn_start;
101 changes: 101 additions & 0 deletions codex-rs/app-server/tests/suite/v2/thread_unarchive.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::ThreadUnarchiveResponse;
use codex_core::find_archived_thread_path_by_id_str;
use codex_core::find_thread_path_by_id_str;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;

const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);

#[tokio::test]
async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;

let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id)
.await?
.expect("expected rollout path for thread id to exist");

let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: thread.id.clone(),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
)
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;

let archived_path = find_archived_thread_path_by_id_str(codex_home.path(), &thread.id)
.await?
.expect("expected archived rollout path for thread id to exist");
let archived_path_display = archived_path.display();
assert!(
archived_path.exists(),
"expected {archived_path_display} to exist"
);

let unarchive_id = mcp
.send_thread_unarchive_request(ThreadUnarchiveParams {
thread_id: thread.id.clone(),
})
.await?;
let unarchive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(unarchive_id)),
)
.await??;
let _: ThreadUnarchiveResponse = to_response::<ThreadUnarchiveResponse>(unarchive_resp)?;

let rollout_path_display = rollout_path.display();
assert!(
rollout_path.exists(),
"expected rollout path {rollout_path_display} to be restored"
);
assert!(
!archived_path.exists(),
"expected archived rollout path {archived_path_display} to be moved"
);

Ok(())
}

fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(config_toml, config_contents())
}

fn config_contents() -> &'static str {
r#"model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
"#
}
2 changes: 2 additions & 0 deletions codex-rs/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub use rollout::INTERACTIVE_SESSION_SOURCES;
pub use rollout::RolloutRecorder;
pub use rollout::SESSIONS_SUBDIR;
pub use rollout::SessionMeta;
pub use rollout::find_archived_thread_path_by_id_str;
#[deprecated(note = "use find_thread_path_by_id_str")]
pub use rollout::find_conversation_path_by_id_str;
pub use rollout::find_thread_path_by_id_str;
Expand All @@ -108,6 +109,7 @@ pub use rollout::list::ThreadsPage;
pub use rollout::list::parse_cursor;
pub use rollout::list::read_head_for_summary;
pub use rollout::list::read_session_meta_line;
pub use rollout::rollout_date_parts;
mod function_tool;
mod state;
mod tasks;
Expand Down
Loading
Loading