Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,18 @@ impl TryFrom<JSONRPCNotification> for ServerNotification {
#[strum(serialize_all = "camelCase")]
pub enum ClientNotification {
Initialized,
/// LSP-style cancellation of an in-flight JSON-RPC request.
/// Shape: { "method": "$/cancelRequest", "params": { "id": <request-id> } }
#[serde(rename = "$/cancelRequest")]
#[ts(rename = "$/cancelRequest")]
#[strum(serialize = "$/cancelRequest")]
CancelRequest(CancelRequestParams),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah very interesting that this is done as a notification. makes sense after reading https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#cancelRequest

Copy link
Contributor

@owenlin0 owenlin0 Nov 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, looks like this might not be applicable to login. the use case for a $/cancelRequest call seems to be:

  1. client makes a request to server
  2. server is processing, no response sent yet
  3. client sends a $/cancelRequest notification
  4. server responds to the original request with a response with ErrorCodes.RequestCancelled

so this seems generally useful to interrupt long-running operations.

whereas with chatgpt login, we do:

  1. client makes an account/login request
  2. server immediately responds with {"login_id": uuid, "auth_url": }, where the url needs to be opened in a webview on the client
  3. user authenticates via the webview
  4. server sends a notification to the client indicating the login was successful

the auth URL looks like:
https://auth.openai.com/oauth/authorize?response_type=code&client_id=app_EMoamEEZ73f0CkXaXp7hrann&redirect_uri=http%3A%2F%2Flocalhost%3A1455%2Fauth%2Fcallback&scope=openid%20profile%20email%20offline_access&code_challenge=[code]&code_challenge_method=S256&id_token_add_organizations=true&codex_cli_simplified_flow=true&state=[state]&originator=codex_cli_rs

So the codex process starts up an HTTP server on port 1455 to listen for the redirect after login is successful, and then update its auth storage. Asking codex, looks like it'll time out in 10 mins if user doesn't complete login flow and there's no cancellation. We expose the CancelChatGptLogin request as a way to immedately cancel the login, mostly for UX I think (i.e. they can render a "cancel login" button in their UI).

tldr my 2c: based on our convo with pavel, I think we actually want to name it account/login/start and pair it with account/login/cancel since the login request returns immediately with a URL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the back and forth on this!

}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
pub struct CancelRequestParams {
pub id: RequestId,
}

#[cfg(test)]
Expand Down Expand Up @@ -499,6 +511,22 @@ mod tests {
Ok(())
}

#[test]
fn serialize_cancel_request_notification() -> Result<()> {
let notification = ClientNotification::CancelRequest(CancelRequestParams {
id: RequestId::Integer(7),
});

assert_eq!(
json!({
"method": "$/cancelRequest",
"params": { "id": 7 }
}),
serde_json::to_value(&notification)?,
);
Ok(())
}

#[test]
fn serialize_server_request() -> Result<()> {
let conversation_id = ConversationId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8")?;
Expand Down
56 changes: 56 additions & 0 deletions codex-rs/app-server/src/cancellation_registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::PoisonError;

use codex_app_server_protocol::RequestId;

trait Cancellable: Send {
fn cancel(&self);
}

impl<F> Cancellable for F
where
F: Fn() + Send,
{
fn cancel(&self) {
(self)();
}
}

#[derive(Clone, Default)]
pub(crate) struct CancellationRegistry {
inner: Arc<StdMutex<HashMap<RequestId, Box<dyn Cancellable + Send>>>>,
}

impl CancellationRegistry {
pub(crate) fn insert<F>(&self, id: RequestId, f: F)
where
F: Fn() + Send + 'static,
{
self.inner
.lock()
.unwrap_or_else(PoisonError::into_inner)
.insert(id, Box::new(f));
}
Comment on lines +27 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Allow FnOnce cancellation callbacks

The new registry only accepts F: Fn() + Send + 'static, which means every cancellation action must implement Fn. That rules out the most common patterns for cancelling background work, e.g. signalling a tokio::sync::oneshot::Sender or taking ownership of an abort handle, because closures such as move || tx.send(()) are only FnOnce. Trying to register one of those today will not compile, so long-running requests that rely on a one-shot cancel signal still cannot integrate with $/cancelRequest, defeating the purpose of this feature. Please change the registry to handle FnOnce callbacks (e.g. by storing them behind an Option/Mutex so they can be invoked exactly once).

Useful? React with 👍 / 👎.


pub(crate) fn cancel(&self, id: &RequestId) -> bool {
// Remove the callback while holding the lock, but invoke it only after
// releasing the lock to avoid deadlocks or long critical sections.
let callback = {
let mut guard = self.inner.lock().unwrap_or_else(PoisonError::into_inner);
guard.remove(id)
};
if let Some(c) = callback {
c.cancel();
true
} else {
false
}
}

pub(crate) fn remove(&self, id: &RequestId) {
let mut guard = self.inner.lock().unwrap_or_else(PoisonError::into_inner);
guard.remove(id);
}
}
36 changes: 36 additions & 0 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,14 @@ use tracing::info;
use tracing::warn;
use uuid::Uuid;

use crate::cancellation_registry::CancellationRegistry;

// Duration before a ChatGPT login attempt is abandoned.
const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60);
struct ActiveLogin {
shutdown_handle: ShutdownHandle,
login_id: Uuid,
request_id: RequestId,
}

impl ActiveLogin {
Expand All @@ -144,6 +147,7 @@ pub(crate) struct CodexMessageProcessor {
pending_interrupts: Arc<Mutex<HashMap<ConversationId, Vec<RequestId>>>>,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
feedback: CodexFeedback,
cancellation_registry: CancellationRegistry,
}

impl CodexMessageProcessor {
Expand All @@ -166,6 +170,7 @@ impl CodexMessageProcessor {
pending_interrupts: Arc::new(Mutex::new(HashMap::new())),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
feedback,
cancellation_registry: CancellationRegistry::default(),
}
}

Expand Down Expand Up @@ -391,13 +396,22 @@ impl CodexMessageProcessor {
let mut guard = self.active_login.lock().await;
if let Some(existing) = guard.take() {
existing.drop();
self.cancellation_registry.remove(&existing.request_id);
}
*guard = Some(ActiveLogin {
shutdown_handle: shutdown_handle.clone(),
login_id,
request_id: request_id.clone(),
});
}

// Register cancellation for this request id so $/cancelRequest works.
let shutdown_for_cancel = shutdown_handle.clone();
self.cancellation_registry
.insert(request_id.clone(), move || {
shutdown_for_cancel.shutdown();
});

let response = LoginChatGptResponse {
login_id,
auth_url: server.auth_url.clone(),
Expand All @@ -407,6 +421,8 @@ impl CodexMessageProcessor {
let outgoing_clone = self.outgoing.clone();
let active_login = self.active_login.clone();
let auth_manager = self.auth_manager.clone();
let cancellation_registry = self.cancellation_registry.clone();
let request_id_for_task = request_id.clone();
tokio::spawn(async move {
let (success, error_msg) = match tokio::time::timeout(
LOGIN_CHATGPT_TIMEOUT,
Expand Down Expand Up @@ -451,6 +467,8 @@ impl CodexMessageProcessor {
if guard.as_ref().map(|l| l.login_id) == Some(login_id) {
*guard = None;
}

cancellation_registry.remove(&request_id_for_task);
});

LoginChatGptReply::Response(response)
Expand All @@ -470,6 +488,24 @@ impl CodexMessageProcessor {
}
}

/// Handle a generic JSON-RPC `$ /cancelRequest` for a previously started operation.
///
/// Note: individual request handlers that wish to be cancellable must
/// register a cancellation action in the `CancellationRegistry` using the
/// original JSON-RPC `request_id` when they start work. This method looks up
/// that action by id and triggers it if found. It is fire-and-forget; no
/// JSON-RPC response is sent.
pub async fn cancel_request(&self, id: RequestId) {
let found = self.cancellation_registry.cancel(&id);
if !found {
tracing::debug!(
"$/cancelRequest for unknown or already-finished id: {:?}",
id
);
}
}

// Legacy endpoint for cancelling a LoginChatGpt request. Please use $/cancelRequest instead.
async fn cancel_login_chatgpt(&mut self, request_id: RequestId, login_id: Uuid) {
let mut guard = self.active_login.lock().await;
if guard.as_ref().map(|l| l.login_id) == Some(login_id) {
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use tracing_subscriber::filter::Targets;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

mod cancellation_registry;
mod codex_message_processor;
mod error_code;
mod fuzzy_file_search;
Expand Down
17 changes: 15 additions & 2 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::path::PathBuf;
use crate::codex_message_processor::CodexMessageProcessor;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::outgoing_message::OutgoingMessageSender;
use codex_app_server_protocol::CancelRequestParams;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::InitializeResponse;

Expand Down Expand Up @@ -125,9 +127,20 @@ impl MessageProcessor {
}

pub(crate) async fn process_notification(&self, notification: JSONRPCNotification) {
// Currently, we do not expect to receive any notifications from the
// client, so we just log them.
tracing::info!("<- notification: {:?}", notification);

if let Ok(value) = serde_json::to_value(&notification)
&& let Ok(typed) = serde_json::from_value::<ClientNotification>(value)
{
match typed {
ClientNotification::CancelRequest(CancelRequestParams { id }) => {
self.codex_message_processor.cancel_request(id).await;
}
ClientNotification::Initialized => {
// Already handled during handshake; ignore.
}
}
}
}

/// Handle a standalone JSON-RPC response originating from the peer.
Expand Down
44 changes: 44 additions & 0 deletions codex-rs/app-server/tests/suite/login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ use app_test_support::McpProcess;
use app_test_support::to_response;
use codex_app_server_protocol::CancelLoginChatGptParams;
use codex_app_server_protocol::CancelLoginChatGptResponse;
use codex_app_server_protocol::CancelRequestParams;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::GetAuthStatusParams;
use codex_app_server_protocol::GetAuthStatusResponse;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::LoginChatGptResponse;
use codex_app_server_protocol::LogoutChatGptResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_core::auth::AuthCredentialsStoreMode;
use codex_login::login_with_api_key;
use serial_test::serial;
Expand Down Expand Up @@ -204,3 +207,44 @@ async fn login_chatgpt_includes_forced_workspace_query_param() -> Result<()> {
);
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial(login_port)]
async fn login_chatgpt_cancelled_via_cancel_request() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

let login_request_id = mcp.send_login_chat_gpt_request().await?;

let login_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(login_request_id)),
)
.await??;
let login: LoginChatGptResponse = to_response(login_resp)?;

// Cancel the in-flight request using LSP-style $/cancelRequest.
mcp.send_notification(ClientNotification::CancelRequest(CancelRequestParams {
id: RequestId::Integer(login_request_id),
}))
.await?;

// Expect a loginChatGptComplete notification indicating cancellation (success = false).
let note = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("loginChatGptComplete"),
)
.await??;

let parsed: ServerNotification = note.try_into()?;
let ServerNotification::LoginChatGptComplete(payload) = parsed else {
anyhow::bail!("expected loginChatGptComplete notification");
};
assert_eq!(payload.login_id, login.login_id);
assert!(!payload.success);
assert!(payload.error.is_some());
Ok(())
}
Loading