Skip to content

Commit fe3e787

Browse files
committed
cancellation registry
1 parent 6508cbf commit fe3e787

File tree

4 files changed

+131
-3
lines changed

4 files changed

+131
-3
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use std::collections::HashMap;
2+
use std::sync::Arc;
3+
use std::sync::Mutex as StdMutex;
4+
5+
use codex_app_server_protocol::RequestId;
6+
7+
trait Cancellable: Send {
8+
fn cancel(&self);
9+
}
10+
11+
impl<F> Cancellable for F
12+
where
13+
F: Fn() + Send,
14+
{
15+
fn cancel(&self) {
16+
(self)();
17+
}
18+
}
19+
20+
#[derive(Clone, Default)]
21+
pub(crate) struct CancellationRegistry {
22+
inner: Arc<StdMutex<HashMap<RequestId, Box<dyn Cancellable + Send>>>>,
23+
}
24+
25+
impl CancellationRegistry {
26+
pub(crate) fn insert<F>(&self, id: RequestId, f: F)
27+
where
28+
F: Fn() + Send + 'static,
29+
{
30+
self.inner.lock().unwrap().insert(id, Box::new(f));
31+
}
32+
33+
pub(crate) fn cancel(&self, id: &RequestId) -> bool {
34+
// Remove the callback while holding the lock, but invoke it only after
35+
// releasing the lock to avoid deadlocks or long critical sections.
36+
let callback = {
37+
let mut guard = self.inner.lock().unwrap();
38+
guard.remove(id)
39+
};
40+
if let Some(c) = callback {
41+
c.cancel();
42+
true
43+
} else {
44+
false
45+
}
46+
}
47+
48+
pub(crate) fn remove(&self, id: &RequestId) {
49+
let mut guard = self.inner.lock().unwrap();
50+
guard.remove(id);
51+
}
52+
}

codex-rs/app-server/src/codex_message_processor.rs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,14 @@ use tracing::info;
118118
use tracing::warn;
119119
use uuid::Uuid;
120120

121+
use crate::cancellation_registry::CancellationRegistry;
122+
121123
// Duration before a ChatGPT login attempt is abandoned.
122124
const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60);
123125
struct ActiveLogin {
124126
shutdown_handle: ShutdownHandle,
125127
login_id: Uuid,
128+
request_id: RequestId,
126129
}
127130

128131
impl ActiveLogin {
@@ -144,6 +147,7 @@ pub(crate) struct CodexMessageProcessor {
144147
pending_interrupts: Arc<Mutex<HashMap<ConversationId, Vec<RequestId>>>>,
145148
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
146149
feedback: CodexFeedback,
150+
cancellation_registry: CancellationRegistry,
147151
}
148152

149153
impl CodexMessageProcessor {
@@ -166,6 +170,7 @@ impl CodexMessageProcessor {
166170
pending_interrupts: Arc::new(Mutex::new(HashMap::new())),
167171
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
168172
feedback,
173+
cancellation_registry: CancellationRegistry::default(),
169174
}
170175
}
171176

@@ -391,13 +396,22 @@ impl CodexMessageProcessor {
391396
let mut guard = self.active_login.lock().await;
392397
if let Some(existing) = guard.take() {
393398
existing.drop();
399+
self.cancellation_registry.remove(&existing.request_id);
394400
}
395401
*guard = Some(ActiveLogin {
396402
shutdown_handle: shutdown_handle.clone(),
397403
login_id,
404+
request_id: request_id.clone(),
398405
});
399406
}
400407

408+
// Register cancellation for this request id so $/cancelRequest works.
409+
let shutdown_for_cancel = shutdown_handle.clone();
410+
self.cancellation_registry
411+
.insert(request_id.clone(), move || {
412+
shutdown_for_cancel.shutdown();
413+
});
414+
401415
let response = LoginChatGptResponse {
402416
login_id,
403417
auth_url: server.auth_url.clone(),
@@ -407,6 +421,8 @@ impl CodexMessageProcessor {
407421
let outgoing_clone = self.outgoing.clone();
408422
let active_login = self.active_login.clone();
409423
let auth_manager = self.auth_manager.clone();
424+
let cancellation_registry = self.cancellation_registry.clone();
425+
let request_id_for_task = request_id.clone();
410426
tokio::spawn(async move {
411427
let (success, error_msg) = match tokio::time::timeout(
412428
LOGIN_CHATGPT_TIMEOUT,
@@ -451,6 +467,8 @@ impl CodexMessageProcessor {
451467
if guard.as_ref().map(|l| l.login_id) == Some(login_id) {
452468
*guard = None;
453469
}
470+
471+
cancellation_registry.remove(&request_id_for_task);
454472
});
455473

456474
LoginChatGptReply::Response(response)
@@ -470,9 +488,22 @@ impl CodexMessageProcessor {
470488
}
471489
}
472490

473-
/// Handle a generic JSON-RPC cancellation for a previously started operation.
474-
/// This is a fire-and-forget path; no response.
475-
pub async fn cancel_request(&self, id: RequestId) {}
491+
/// Handle a generic JSON-RPC `$ /cancelRequest` for a previously started operation.
492+
///
493+
/// Note: individual request handlers that wish to be cancellable must
494+
/// register a cancellation action in the `CancellationRegistry` using the
495+
/// original JSON-RPC `request_id` when they start work. This method looks up
496+
/// that action by id and triggers it if found. It is fire-and-forget; no
497+
/// JSON-RPC response is sent.
498+
pub async fn cancel_request(&self, id: RequestId) {
499+
let found = self.cancellation_registry.cancel(&id);
500+
if !found {
501+
tracing::debug!(
502+
"$/cancelRequest for unknown or already-finished id: {:?}",
503+
id
504+
);
505+
}
506+
}
476507

477508
// Legacy endpoint for cancelling a LoginChatGpt request. Please use $/cancelRequest instead.
478509
async fn cancel_login_chatgpt(&mut self, request_id: RequestId, login_id: Uuid) {

codex-rs/app-server/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use tracing_subscriber::filter::Targets;
2828
use tracing_subscriber::layer::SubscriberExt;
2929
use tracing_subscriber::util::SubscriberInitExt;
3030

31+
mod cancellation_registry;
3132
mod codex_message_processor;
3233
mod error_code;
3334
mod fuzzy_file_search;

codex-rs/app-server/tests/suite/login.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@ use app_test_support::McpProcess;
33
use app_test_support::to_response;
44
use codex_app_server_protocol::CancelLoginChatGptParams;
55
use codex_app_server_protocol::CancelLoginChatGptResponse;
6+
use codex_app_server_protocol::CancelRequestParams;
7+
use codex_app_server_protocol::ClientNotification;
68
use codex_app_server_protocol::GetAuthStatusParams;
79
use codex_app_server_protocol::GetAuthStatusResponse;
810
use codex_app_server_protocol::JSONRPCError;
911
use codex_app_server_protocol::JSONRPCResponse;
1012
use codex_app_server_protocol::LoginChatGptResponse;
1113
use codex_app_server_protocol::LogoutChatGptResponse;
1214
use codex_app_server_protocol::RequestId;
15+
use codex_app_server_protocol::ServerNotification;
1316
use codex_core::auth::AuthCredentialsStoreMode;
1417
use codex_login::login_with_api_key;
1518
use serial_test::serial;
@@ -204,3 +207,44 @@ async fn login_chatgpt_includes_forced_workspace_query_param() -> Result<()> {
204207
);
205208
Ok(())
206209
}
210+
211+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
212+
#[serial(login_port)]
213+
async fn login_chatgpt_cancelled_via_cancel_request() -> Result<()> {
214+
let codex_home = TempDir::new()?;
215+
create_config_toml(codex_home.path())?;
216+
217+
let mut mcp = McpProcess::new(codex_home.path()).await?;
218+
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
219+
220+
let login_request_id = mcp.send_login_chat_gpt_request().await?;
221+
222+
let login_resp: JSONRPCResponse = timeout(
223+
DEFAULT_READ_TIMEOUT,
224+
mcp.read_stream_until_response_message(RequestId::Integer(login_request_id)),
225+
)
226+
.await??;
227+
let login: LoginChatGptResponse = to_response(login_resp)?;
228+
229+
// Cancel the in-flight request using LSP-style $/cancelRequest.
230+
mcp.send_notification(ClientNotification::CancelRequest(CancelRequestParams {
231+
id: RequestId::Integer(login_request_id),
232+
}))
233+
.await?;
234+
235+
// Expect a loginChatGptComplete notification indicating cancellation (success = false).
236+
let note = timeout(
237+
DEFAULT_READ_TIMEOUT,
238+
mcp.read_stream_until_notification_message("loginChatGptComplete"),
239+
)
240+
.await??;
241+
242+
let parsed: ServerNotification = note.try_into()?;
243+
let ServerNotification::LoginChatGptComplete(payload) = parsed else {
244+
anyhow::bail!("expected loginChatGptComplete notification");
245+
};
246+
assert_eq!(payload.login_id, login.login_id);
247+
assert_eq!(payload.success, false);
248+
assert!(payload.error.is_some());
249+
Ok(())
250+
}

0 commit comments

Comments
 (0)