Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions codex-rs/app-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ Supported transports:

Websocket transport is currently experimental and unsupported. Do not rely on it for production workloads.

Backpressure behavior:

- The server uses bounded queues between transport ingress, request processing, and outbound writes.
- When request ingress is saturated, new requests are rejected with a JSON-RPC error code `-32001` and message `"Server overloaded; retry later."`.
- Clients should treat this as retryable and use exponential backoff with jitter.

## Message Schema

Currently, you can dump a TypeScript version of the schema using `codex app-server generate-ts`, or a JSON Schema bundle via `codex app-server generate-json-schema`. Each output is specific to the version of Codex you used to run the command, so the generated artifacts are guaranteed to match that version.
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/src/error_code.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub(crate) const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
pub(crate) const INTERNAL_ERROR_CODE: i64 = -32603;
pub(crate) const OVERLOADED_ERROR_CODE: i64 = -32001;
115 changes: 108 additions & 7 deletions codex-rs/app-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::ConfigLayerStackOrdering;
use codex_core::config_loader::LoaderOverrides;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;

use crate::message_processor::MessageProcessor;
use crate::message_processor::MessageProcessorArgs;
Expand All @@ -21,6 +23,7 @@ use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessageSender;
use crate::transport::CHANNEL_CAPACITY;
use crate::transport::ConnectionState;
use crate::transport::OutboundConnectionState;
use crate::transport::TransportEvent;
use crate::transport::has_initialized_connections;
use crate::transport::route_outgoing_envelope;
Expand Down Expand Up @@ -61,6 +64,26 @@ mod transport;

pub use crate::transport::AppServerTransport;

/// Control-plane messages from the processor/transport side to the outbound router task.
///
/// `run_main_with_transport` now uses two loops/tasks:
/// - processor loop: handles incoming JSON-RPC and request dispatch
/// - outbound loop: performs potentially slow writes to per-connection writers
///
/// `OutboundControlEvent` keeps those loops coordinated without sharing mutable
/// connection state directly. In particular, the outbound loop needs to know
/// when a connection opens/closes so it can route messages correctly.
enum OutboundControlEvent {
/// Register a new writer for an opened connection.
Opened {
connection_id: ConnectionId,
writer: mpsc::Sender<crate::outgoing_message::OutgoingMessage>,
initialized: Arc<AtomicBool>,
},
/// Remove state for a closed/disconnected connection.
Closed { connection_id: ConnectionId },
}

fn config_warning_from_error(
summary: impl Into<String>,
err: &std::io::Error,
Expand Down Expand Up @@ -197,6 +220,8 @@ pub async fn run_main_with_transport(
let (transport_event_tx, mut transport_event_rx) =
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
let (outbound_control_tx, mut outbound_control_rx) =
mpsc::channel::<OutboundControlEvent>(CHANNEL_CAPACITY);

let mut stdio_handles = Vec::<JoinHandle<()>>::new();
let mut websocket_accept_handle = None;
Expand Down Expand Up @@ -336,8 +361,65 @@ pub async fn run_main_with_transport(
}
}

let transport_event_tx_for_outbound = transport_event_tx.clone();
let outbound_handle = tokio::spawn(async move {
let mut outbound_connections = HashMap::<ConnectionId, OutboundConnectionState>::new();
let mut pending_closed_connections = VecDeque::<ConnectionId>::new();
loop {
tokio::select! {
biased;
event = outbound_control_rx.recv() => {
let Some(event) = event else {
break;
};
match event {
OutboundControlEvent::Opened {
connection_id,
writer,
initialized,
} => {
outbound_connections.insert(
connection_id,
OutboundConnectionState::new(writer, initialized),
);
}
OutboundControlEvent::Closed { connection_id } => {
outbound_connections.remove(&connection_id);
}
}
}
envelope = outgoing_rx.recv() => {
let Some(envelope) = envelope else {
break;
};
let disconnected_connections =
route_outgoing_envelope(&mut outbound_connections, envelope).await;
pending_closed_connections.extend(disconnected_connections);
}
}

while let Some(connection_id) = pending_closed_connections.front().copied() {
match transport_event_tx_for_outbound
.try_send(TransportEvent::ConnectionClosed { connection_id })
{
Ok(()) => {
pending_closed_connections.pop_front();
}
Err(mpsc::error::TrySendError::Full(_)) => {
break;
}
Comment on lines +408 to +410
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep retrying deferred close events when queue is full

When try_send(TransportEvent::ConnectionClosed { .. }) returns Full, this branch just breaks and retries only after another outbound/control event arrives. If the queue later drains while the server is otherwise idle, the deferred close is never delivered, leaving a stale connection in the processor (and in stdio mode this can prevent shutdown_when_no_connections from ever triggering). This close-notification path needs a retry mechanism that does not depend on new outbound traffic.

Useful? React with 👍 / 👎.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems legit?

Err(mpsc::error::TrySendError::Closed(_)) => {
return;
}
}
}
}
info!("outbound router task exited (channel closed)");
});

let processor_handle = tokio::spawn({
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let outbound_control_tx = outbound_control_tx;
let cli_overrides: Vec<(String, TomlValue)> = cli_kv_overrides.clone();
let loader_overrides = loader_overrides_for_config_api;
let mut processor = MessageProcessor::new(MessageProcessorArgs {
Expand All @@ -362,9 +444,28 @@ pub async fn run_main_with_transport(
};
match event {
TransportEvent::ConnectionOpened { connection_id, writer } => {
connections.insert(connection_id, ConnectionState::new(writer));
let outbound_initialized = Arc::new(AtomicBool::new(false));
if outbound_control_tx
.send(OutboundControlEvent::Opened {
connection_id,
writer,
initialized: Arc::clone(&outbound_initialized),
})
.await
.is_err()
{
break;
}
connections.insert(connection_id, ConnectionState::new(outbound_initialized));
}
TransportEvent::ConnectionClosed { connection_id } => {
if outbound_control_tx
.send(OutboundControlEvent::Closed { connection_id })
.await
.is_err()
{
break;
}
connections.remove(&connection_id);
if shutdown_when_no_connections && connections.is_empty() {
break;
Expand All @@ -377,13 +478,18 @@ pub async fn run_main_with_transport(
warn!("dropping request from unknown connection: {:?}", connection_id);
continue;
};
let was_initialized = connection_state.session.initialized;
processor
.process_request(
connection_id,
request,
&mut connection_state.session,
&connection_state.outbound_initialized,
)
.await;
if !was_initialized && connection_state.session.initialized {
processor.send_initialize_notifications().await;
}
}
JSONRPCMessage::Response(response) => {
processor.process_response(response).await;
Expand All @@ -398,12 +504,6 @@ pub async fn run_main_with_transport(
}
}
}
envelope = outgoing_rx.recv() => {
let Some(envelope) = envelope else {
break;
};
route_outgoing_envelope(&mut connections, envelope).await;
}
created = thread_created_rx.recv(), if listen_for_threads => {
match created {
Ok(thread_id) => {
Expand Down Expand Up @@ -433,6 +533,7 @@ pub async fn run_main_with_transport(
drop(transport_event_tx);

let _ = processor_handle.await;
let _ = outbound_handle.await;

if let Some(handle) = websocket_accept_handle {
handle.abort();
Expand Down
20 changes: 12 additions & 8 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;

use crate::codex_message_processor::CodexMessageProcessor;
use crate::codex_message_processor::CodexMessageProcessorArgs;
Expand Down Expand Up @@ -191,6 +193,7 @@ impl MessageProcessor {
connection_id: ConnectionId,
request: JSONRPCRequest,
session: &mut ConnectionSessionState,
outbound_initialized: &AtomicBool,
) {
let request_id = ConnectionRequestId {
connection_id,
Expand Down Expand Up @@ -286,14 +289,7 @@ impl MessageProcessor {
self.outgoing.send_response(request_id, response).await;

session.initialized = true;
for notification in self.config_warnings.iter().cloned() {
self.outgoing
.send_server_notification(ServerNotification::ConfigWarning(
notification,
))
.await;
}

outbound_initialized.store(true, Ordering::Release);
return;
}
}
Expand Down Expand Up @@ -381,6 +377,14 @@ impl MessageProcessor {
self.codex_message_processor.thread_created_receiver()
}

pub(crate) async fn send_initialize_notifications(&self) {
for notification in self.config_warnings.iter().cloned() {
self.outgoing
.send_server_notification(ServerNotification::ConfigWarning(notification))
.await;
}
}

pub(crate) async fn try_attach_thread_listener(&mut self, thread_id: ThreadId) {
self.codex_message_processor
.try_attach_thread_listener(thread_id)
Expand Down
Loading
Loading