Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions codex-rs/app-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ codex-rmcp-client = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-json-to-toml = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true, features = ["derive"] }
futures = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tempfile = { workspace = true }
Expand All @@ -45,6 +47,7 @@ tokio = { workspace = true, features = [
"rt-multi-thread",
"signal",
] }
tokio-tungstenite = { workspace = true }
tracing = { workspace = true, features = ["log"] }
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
uuid = { workspace = true, features = ["serde", "v7"] }
Expand All @@ -59,12 +62,14 @@ axum = { workspace = true, default-features = false, features = [
base64 = { workspace = true }
codex-execpolicy = { workspace = true }
core_test_support = { workspace = true }
codex-utils-cargo-bin = { workspace = true }
os_info = { workspace = true }
pretty_assertions = { workspace = true }
rmcp = { workspace = true, default-features = false, features = [
"server",
"transport-streamable-http-server",
] }
serial_test = { workspace = true }
tokio-tungstenite = { workspace = true }
wiremock = { workspace = true }
shlex = { workspace = true }
13 changes: 10 additions & 3 deletions codex-rs/app-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@

## Protocol

Similar to [MCP](https://modelcontextprotocol.io/), `codex app-server` supports bidirectional communication, streaming JSONL over stdio. The protocol is JSON-RPC 2.0, though the `"jsonrpc":"2.0"` header is omitted.
Similar to [MCP](https://modelcontextprotocol.io/), `codex app-server` supports bidirectional communication using JSON-RPC 2.0 messages (with the `"jsonrpc":"2.0"` header omitted on the wire).

Supported transports:

- stdio (`--listen stdio://`, default): newline-delimited JSON (JSONL)
- websocket (`--listen ws://IP:PORT`): one JSON-RPC message per websocket text frame (**experimental / unsupported**)

Websocket transport is currently experimental and unsupported. Do not rely on it for production workloads.

## Message Schema

Expand All @@ -42,15 +49,15 @@ Use the thread APIs to create, list, or archive conversations. Drive a conversat

## Lifecycle Overview

- Initialize once: Immediately after launching the codex app-server process, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request before this handshake gets rejected.
- Initialize once per connection: Immediately after opening a transport connection, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request on that connection before this handshake gets rejected.
- Start (or resume) a thread: Call `thread/start` to open a fresh conversation. The response returns the thread object and you’ll also get a `thread/started` notification. If you’re continuing an existing conversation, call `thread/resume` with its ID instead. If you want to branch from an existing conversation, call `thread/fork` to create a new thread id with copied history.
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, etc. This immediately returns the new turn object and triggers a `turn/started` notification.
- Stream events: After `turn/start`, keep reading JSON-RPC notifications on stdout. You’ll see `item/started`, `item/completed`, deltas like `item/agentMessage/delta`, tool progress, etc. These represent streaming model output plus any side effects (commands, tool calls, reasoning notes).
- Finish the turn: When the model is done (or the turn is interrupted via making the `turn/interrupt` call), the server sends `turn/completed` with the final turn state and token usage.

## Initialization

Clients must send a single `initialize` request before invoking any other method, then acknowledge with an `initialized` notification. The server returns the user agent string it will present to upstream services; subsequent requests issued before initialization receive a `"Not initialized"` error, and repeated `initialize` calls receive an `"Already initialized"` error.
Clients must send a single `initialize` request per transport connection before invoking any other method on that connection, then acknowledge with an `initialized` notification. The server returns the user agent string it will present to upstream services; subsequent requests issued before initialization receive a `"Not initialized"` error, and repeated `initialize` calls on the same connection receive an `"Already initialized"` error.

Applications building on top of `codex app-server` should identify themselves via the `clientInfo` parameter.

Expand Down
70 changes: 28 additions & 42 deletions codex-rs/app-server/src/bespoke_event_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ pub(crate) async fn apply_bespoke_event_handling(
),
data: None,
};
outgoing.send_error(request_id, error).await;
outgoing.send_error(request_id.clone(), error).await;
return;
}
}
Expand All @@ -1107,7 +1107,7 @@ pub(crate) async fn apply_bespoke_event_handling(
),
data: None,
};
outgoing.send_error(request_id, error).await;
outgoing.send_error(request_id.clone(), error).await;
return;
}
};
Expand Down Expand Up @@ -1831,6 +1831,7 @@ async fn construct_mcp_tool_call_end_notification(
mod tests {
use super::*;
use crate::CHANNEL_CAPACITY;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
use anyhow::Result;
Expand Down Expand Up @@ -1858,6 +1859,21 @@ mod tests {
Arc::new(Mutex::new(HashMap::new()))
}

async fn recv_broadcast_message(
rx: &mut mpsc::Receiver<OutgoingEnvelope>,
) -> Result<OutgoingMessage> {
let envelope = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send one message"))?;
match envelope {
OutgoingEnvelope::Broadcast { message } => Ok(message),
OutgoingEnvelope::ToConnection { connection_id, .. } => {
bail!("unexpected targeted message for connection {connection_id:?}")
}
}
}

#[test]
fn file_change_accept_for_session_maps_to_approved_for_session() {
let (decision, completion_status) =
Expand Down Expand Up @@ -1910,10 +1926,7 @@ mod tests {
)
.await;

let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send one notification"))?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, event_turn_id);
Expand Down Expand Up @@ -1952,10 +1965,7 @@ mod tests {
)
.await;

let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send one notification"))?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, event_turn_id);
Expand Down Expand Up @@ -1994,10 +2004,7 @@ mod tests {
)
.await;

let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send one notification"))?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, event_turn_id);
Expand Down Expand Up @@ -2046,10 +2053,7 @@ mod tests {
)
.await;

let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send one notification"))?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(n)) => {
assert_eq!(n.thread_id, conversation_id.to_string());
Expand Down Expand Up @@ -2117,10 +2121,7 @@ mod tests {
)
.await;

let first = rx
.recv()
.await
.ok_or_else(|| anyhow!("expected usage notification"))?;
let first = recv_broadcast_message(&mut rx).await?;
match first {
OutgoingMessage::AppServerNotification(
ServerNotification::ThreadTokenUsageUpdated(payload),
Expand All @@ -2136,10 +2137,7 @@ mod tests {
other => bail!("unexpected notification: {other:?}"),
}

let second = rx
.recv()
.await
.ok_or_else(|| anyhow!("expected rate limit notification"))?;
let second = recv_broadcast_message(&mut rx).await?;
match second {
OutgoingMessage::AppServerNotification(
ServerNotification::AccountRateLimitsUpdated(payload),
Expand Down Expand Up @@ -2276,10 +2274,7 @@ mod tests {
.await;

// Verify: A turn 1
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send first notification"))?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, a_turn1);
Expand All @@ -2297,10 +2292,7 @@ mod tests {
}

// Verify: B turn 1
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send second notification"))?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, b_turn1);
Expand All @@ -2318,10 +2310,7 @@ mod tests {
}

// Verify: A turn 2
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send third notification"))?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, a_turn2);
Expand Down Expand Up @@ -2487,10 +2476,7 @@ mod tests {
)
.await;

let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send one notification"))?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnDiffUpdated(
notification,
Expand Down
Loading
Loading