Skip to content

Commit a9debc6

Browse files
committed
cancellation registry
1 parent 6508cbf commit a9debc6

File tree

3 files changed

+80
-3
lines changed

3 files changed

+80
-3
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
use std::collections::HashMap;
2+
use std::sync::Arc;
3+
use std::sync::Mutex as StdMutex;
4+
5+
use codex_app_server_protocol::RequestId;
6+
7+
trait Cancellable: Send {
8+
fn cancel(&self);
9+
}
10+
11+
impl<F> Cancellable for F
12+
where
13+
F: Fn() + Send,
14+
{
15+
fn cancel(&self) {
16+
(self)();
17+
}
18+
}
19+
20+
#[derive(Clone, Default)]
21+
pub(crate) struct CancellationRegistry {
22+
inner: Arc<StdMutex<HashMap<RequestId, Box<dyn Cancellable + Send>>>>,
23+
}
24+
25+
impl CancellationRegistry {
26+
pub(crate) fn insert<F>(&self, id: RequestId, f: F)
27+
where
28+
F: Fn() + Send + 'static,
29+
{
30+
self.inner.lock().unwrap().insert(id, Box::new(f));
31+
}
32+
33+
pub(crate) fn cancel(&self, id: &RequestId) -> bool {
34+
if let Some(c) = self.inner.lock().unwrap().remove(id) {
35+
c.cancel();
36+
true
37+
} else {
38+
false
39+
}
40+
}
41+
42+
pub(crate) fn remove(&self, id: &RequestId) {
43+
self.inner.lock().unwrap().remove(id);
44+
}
45+
}

codex-rs/app-server/src/codex_message_processor.rs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,14 @@ use tracing::info;
118118
use tracing::warn;
119119
use uuid::Uuid;
120120

121+
use crate::cancellation_registry::CancellationRegistry;
122+
121123
// Duration before a ChatGPT login attempt is abandoned.
122124
const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60);
123125
struct ActiveLogin {
124126
shutdown_handle: ShutdownHandle,
125127
login_id: Uuid,
128+
request_id: RequestId,
126129
}
127130

128131
impl ActiveLogin {
@@ -144,6 +147,7 @@ pub(crate) struct CodexMessageProcessor {
144147
pending_interrupts: Arc<Mutex<HashMap<ConversationId, Vec<RequestId>>>>,
145148
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
146149
feedback: CodexFeedback,
150+
cancellation_registry: CancellationRegistry,
147151
}
148152

149153
impl CodexMessageProcessor {
@@ -166,6 +170,7 @@ impl CodexMessageProcessor {
166170
pending_interrupts: Arc::new(Mutex::new(HashMap::new())),
167171
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
168172
feedback,
173+
cancellation_registry: CancellationRegistry::default(),
169174
}
170175
}
171176

@@ -391,13 +396,22 @@ impl CodexMessageProcessor {
391396
let mut guard = self.active_login.lock().await;
392397
if let Some(existing) = guard.take() {
393398
existing.drop();
399+
self.cancellation_registry.remove(&existing.request_id);
394400
}
395401
*guard = Some(ActiveLogin {
396402
shutdown_handle: shutdown_handle.clone(),
397403
login_id,
404+
request_id: request_id.clone(),
398405
});
399406
}
400407

408+
// Register cancellation for this request id so $/cancelRequest works.
409+
let shutdown_for_cancel = shutdown_handle.clone();
410+
self.cancellation_registry
411+
.insert(request_id.clone(), move || {
412+
shutdown_for_cancel.shutdown();
413+
});
414+
401415
let response = LoginChatGptResponse {
402416
login_id,
403417
auth_url: server.auth_url.clone(),
@@ -407,6 +421,8 @@ impl CodexMessageProcessor {
407421
let outgoing_clone = self.outgoing.clone();
408422
let active_login = self.active_login.clone();
409423
let auth_manager = self.auth_manager.clone();
424+
let cancellation_registry = self.cancellation_registry.clone();
425+
let request_id_for_task = request_id.clone();
410426
tokio::spawn(async move {
411427
let (success, error_msg) = match tokio::time::timeout(
412428
LOGIN_CHATGPT_TIMEOUT,
@@ -451,6 +467,8 @@ impl CodexMessageProcessor {
451467
if guard.as_ref().map(|l| l.login_id) == Some(login_id) {
452468
*guard = None;
453469
}
470+
471+
cancellation_registry.remove(&request_id_for_task);
454472
});
455473

456474
LoginChatGptReply::Response(response)
@@ -470,9 +488,22 @@ impl CodexMessageProcessor {
470488
}
471489
}
472490

473-
/// Handle a generic JSON-RPC cancellation for a previously started operation.
474-
/// This is a fire-and-forget path; no response.
475-
pub async fn cancel_request(&self, id: RequestId) {}
491+
/// Handle a generic JSON-RPC `$ /cancelRequest` for a previously started operation.
492+
///
493+
/// Note: individual request handlers that wish to be cancellable must
494+
/// register a cancellation action in the `CancellationRegistry` using the
495+
/// original JSON-RPC `request_id` when they start work. This method looks up
496+
/// that action by id and triggers it if found. It is fire-and-forget; no
497+
/// JSON-RPC response is sent.
498+
pub async fn cancel_request(&self, id: RequestId) {
499+
let found = self.cancellation_registry.cancel(&id);
500+
if !found {
501+
tracing::debug!(
502+
"$/cancelRequest for unknown or already-finished id: {:?}",
503+
id
504+
);
505+
}
506+
}
476507

477508
// Legacy endpoint for cancelling a LoginChatGpt request. Please use $/cancelRequest instead.
478509
async fn cancel_login_chatgpt(&mut self, request_id: RequestId, login_id: Uuid) {

codex-rs/app-server/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use tracing_subscriber::filter::Targets;
2828
use tracing_subscriber::layer::SubscriberExt;
2929
use tracing_subscriber::util::SubscriberInitExt;
3030

31+
mod cancellation_registry;
3132
mod codex_message_processor;
3233
mod error_code;
3334
mod fuzzy_file_search;

0 commit comments

Comments
 (0)