From 76c7486950b15139c9473d0548b2a2e8048a795f Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Mon, 2 Feb 2026 23:35:20 -0800 Subject: [PATCH 1/9] Added support for live updates to skills Add a centralized FileWatcher in codex-core (using notify) that watches skill roots from the config layer stack (recursive) Send `SkillsChanged` events when relevant file system changes are detected On `SkillsChanged`: * Invalidate the skills cache immediately in ThreadManager * Emit EventMsg::SkillsUpdateAvailable to active sessions * Broadcast a new app-server notification: skills/list/updated Add SkillsListUpdatedNotification to the app-server protocol and gate broadcast until after initialize. This change does not inject new events into the event stream. That means the agent will not know about new skills, so it won't be able to implicitly invoke new skills. It also won't know about changes to existing skills, so if it has already read the contents of a modified skill, it will not honor the new behavior. I plan to address these limitations in a follow-on PR modeled after #9985. Injection of new skills (and AGENTS) was deemed to risky at this point, hence the need to split the feature into two stages. Testing: * In addition to automated tests, I did manual testing to confirm that newly-created skills, deleted skills, and renamed skills are reflected in the TUI skill picker menu. Also confirmed that modifications to behaviors for explicitly-invoked skills are honored. --- codex-rs/Cargo.lock | 1 + .../src/protocol/common.rs | 1 + .../app-server-protocol/src/protocol/v2.rs | 5 + codex-rs/app-server/src/message_processor.rs | 29 ++ codex-rs/core/Cargo.toml | 1 + codex-rs/core/src/codex.rs | 95 +++++- codex-rs/core/src/codex_delegate.rs | 1 + codex-rs/core/src/file_watcher.rs | 316 ++++++++++++++++++ codex-rs/core/src/lib.rs | 2 + codex-rs/core/src/project_doc.rs | 47 +-- codex-rs/core/src/skills/manager.rs | 38 +-- codex-rs/core/src/state/service.rs | 2 + codex-rs/core/src/thread_manager.rs | 70 +++- codex-rs/core/tests/suite/live_reload.rs | 163 +++++++++ codex-rs/core/tests/suite/mod.rs | 1 + 15 files changed, 716 insertions(+), 56 deletions(-) create mode 100644 codex-rs/core/src/file_watcher.rs create mode 100644 codex-rs/core/tests/suite/live_reload.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 992a710a128..dbc5948ad16 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1436,6 +1436,7 @@ dependencies = [ "libc", "maplit", "multimap", + "notify", "once_cell", "openssl-sys", "opentelemetry_sdk", diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index d20f23a88a5..f6196d8823e 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -704,6 +704,7 @@ server_notification_definitions! { ContextCompacted => "thread/compacted" (v2::ContextCompactedNotification), DeprecationNotice => "deprecationNotice" (v2::DeprecationNoticeNotification), ConfigWarning => "configWarning" (v2::ConfigWarningNotification), + SkillsListUpdated => "skills/list/updated" (v2::SkillsListUpdatedNotification), /// Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox. WindowsWorldWritableWarning => "windows/worldWritableWarning" (v2::WindowsWorldWritableWarningNotification), diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 19db13723af..aa419abf30d 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -2635,6 +2635,11 @@ pub struct ContextCompactedNotification { pub turn_id: String, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct SkillsListUpdatedNotification {} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index db6275dcaff..c05ccd62f06 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -28,8 +28,10 @@ use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequestPayload; +use codex_app_server_protocol::SkillsListUpdatedNotification; use codex_app_server_protocol::experimental_required_message; use codex_core::AuthManager; +use codex_core::FileWatcherEvent; use codex_core::ThreadManager; use codex_core::auth::ExternalAuthRefreshContext; use codex_core::auth::ExternalAuthRefreshReason; @@ -112,6 +114,7 @@ pub(crate) struct MessageProcessor { config: Arc, initialized: bool, experimental_api_enabled: Arc, + initialized_flag: Arc, config_warnings: Vec, } @@ -156,6 +159,30 @@ impl MessageProcessor { auth_manager.clone(), SessionSource::VSCode, )); + + // Watch for on-disk skill changes and send notifications to the client. + let initialized_flag = Arc::new(AtomicBool::new(false)); + let mut skills_updates_rx = thread_manager.subscribe_file_watcher(); + let outgoing_for_skills = Arc::clone(&outgoing); + let initialized_for_skills = Arc::clone(&initialized_flag); + tokio::spawn(async move { + loop { + match skills_updates_rx.recv().await { + Ok(FileWatcherEvent::SkillsChanged { .. }) => { + if !initialized_for_skills.load(Ordering::SeqCst) { + continue; + } + outgoing_for_skills + .send_server_notification(ServerNotification::SkillsListUpdated( + SkillsListUpdatedNotification {}, + )) + .await; + } + Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(_)) => continue, + } + } + }); let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs { auth_manager, thread_manager, @@ -180,6 +207,7 @@ impl MessageProcessor { config, initialized: false, experimental_api_enabled, + initialized_flag, config_warnings, } } @@ -268,6 +296,7 @@ impl MessageProcessor { self.outgoing.send_response(request_id, response).await; self.initialized = true; + self.initialized_flag.store(true, Ordering::SeqCst); if !self.config_warnings.is_empty() { for notification in self.config_warnings.drain(..) { self.outgoing diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index 38da38b929f..45341087f27 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -58,6 +58,7 @@ indoc = { workspace = true } keyring = { workspace = true, features = ["crypto-rust"] } libc = { workspace = true } multimap = { workspace = true } +notify = { workspace = true } once_cell = { workspace = true } os_info = { workspace = true } rand = { workspace = true } diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index ba84b6b4321..fbe991e9f8e 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -118,6 +118,8 @@ use crate::error::Result as CodexResult; use crate::exec::StreamOutput; use crate::exec_policy::ExecPolicyUpdateError; use crate::feedback_tags; +use crate::file_watcher::FileWatcher; +use crate::file_watcher::FileWatcherEvent; use crate::git_info::get_git_repo_root; use crate::instructions::UserInstructions; use crate::mcp::CODEX_APPS_MCP_SERVER_NAME; @@ -277,6 +279,7 @@ impl Codex { auth_manager: Arc, models_manager: Arc, skills_manager: Arc, + file_watcher: Arc, conversation_history: InitialHistory, session_source: SessionSource, agent_control: AgentControl, @@ -413,6 +416,7 @@ impl Codex { conversation_history, session_source_clone, skills_manager, + file_watcher, agent_control, ) .instrument(session_init_span) @@ -675,6 +679,29 @@ impl Session { state.session_configuration.codex_home().clone() } + fn start_file_watcher_listener(self: &Arc) { + let mut rx = self.services.file_watcher.subscribe(); + let weak_sess = Arc::downgrade(self); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(FileWatcherEvent::SkillsChanged { .. }) => { + let Some(sess) = weak_sess.upgrade() else { + break; + }; + let event = Event { + id: sess.next_internal_sub_id(), + msg: EventMsg::SkillsUpdateAvailable, + }; + sess.send_event_raw(event).await; + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + } + } + }); + } + #[allow(clippy::too_many_arguments)] fn make_turn_context( auth_manager: Option>, @@ -746,6 +773,7 @@ impl Session { initial_history: InitialHistory, session_source: SessionSource, skills_manager: Arc, + file_watcher: Arc, agent_control: AgentControl, ) -> anyhow::Result> { debug!( @@ -946,6 +974,7 @@ impl Session { models_manager: Arc::clone(&models_manager), tool_approvals: Mutex::new(ApprovalStore::default()), skills_manager, + file_watcher, agent_control, state_db: state_db_ctx.clone(), transport_manager: TransportManager::new(), @@ -989,6 +1018,9 @@ impl Session { sess.send_event_raw(event).await; } + // Start the watcher after SessionConfigured so it cannot emit earlier events. + sess.start_file_watcher_listener(); + // Construct sandbox_state before initialize() so it can be sent to each // MCP server immediately after it becomes ready (avoiding blocking). let sandbox_state = SandboxState { @@ -4501,9 +4533,14 @@ pub(crate) use tests::make_session_and_context_with_rx; #[cfg(test)] mod tests { + use std::fs; + use super::*; use crate::CodexAuth; + use crate::config::CONFIG_TOML_FILE; use crate::config::ConfigBuilder; + use crate::config::ConfigToml; + use crate::config::ProjectConfig; use crate::config::test_config; use crate::exec::ExecToolCallOutput; use crate::function_tool::FunctionCallError; @@ -4511,6 +4548,7 @@ mod tests { use crate::tools::format_exec_output_str; use codex_protocol::ThreadId; + use codex_protocol::config_types::TrustLevel; use codex_protocol::models::FunctionCallOutputPayload; use crate::protocol::CompactedItem; @@ -5287,6 +5325,43 @@ mod tests { .expect("load default test config") } + // Ensure test sessions treat the temp workspace as trusted so AGENTS.md + // and project-doc instructions are loaded consistently. + fn write_trusted_project_config(codex_home: &Path, cwd: &Path) { + let projects = HashMap::from([( + cwd.to_string_lossy().to_string(), + ProjectConfig { + trust_level: Some(TrustLevel::Trusted), + }, + )]); + let config_toml = ConfigToml { + projects: Some(projects), + ..Default::default() + }; + let config_toml_str = toml::to_string(&config_toml).expect("serialize config toml"); + fs::write(codex_home.join(CONFIG_TOML_FILE), config_toml_str).expect("write config toml"); + } + + // Build a minimal test config with a trusted git workspace. + async fn build_trusted_test_config() -> Arc { + let codex_home = tempfile::tempdir().expect("create temp dir"); + let codex_home_path = codex_home.keep(); + let cwd = tempfile::tempdir().expect("create temp cwd"); + let cwd_path = cwd.keep(); + fs::create_dir(cwd_path.join(".git")).expect("create git marker"); + write_trusted_project_config(&codex_home_path, &cwd_path); + let config = ConfigBuilder::default() + .codex_home(codex_home_path) + .harness_overrides(crate::config::ConfigOverrides { + cwd: Some(cwd_path), + ..Default::default() + }) + .build() + .await + .expect("load overridden test config"); + Arc::new(config) + } + fn otel_manager( conversation_id: ThreadId, config: &Config, @@ -5308,9 +5383,7 @@ mod tests { pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { let (tx_event, _rx_event) = async_channel::unbounded(); - let codex_home = tempfile::tempdir().expect("create temp dir"); - let config = build_test_config(codex_home.path()).await; - let config = Arc::new(config); + let config = build_trusted_test_config().await; let conversation_id = ThreadId::default(); let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); @@ -5320,6 +5393,7 @@ mod tests { )); let agent_control = AgentControl::default(); let exec_policy = ExecPolicyManager::default(); + let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone())); let (agent_status_tx, _agent_status_rx) = watch::channel(AgentStatus::PendingInit); let model = ModelsManager::get_model_offline(config.model.as_deref()); let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config); @@ -5332,12 +5406,15 @@ mod tests { developer_instructions: None, }, }; + let skills_outcome = skills_manager.skills_for_config(config.as_ref()); + let enabled_skills = skills_outcome.enabled_skills(); + let user_instructions = get_user_instructions(config.as_ref(), Some(&enabled_skills)).await; let session_configuration = SessionConfiguration { provider: config.model_provider.clone(), collaboration_mode, model_reasoning_summary: config.model_reasoning_summary, developer_instructions: config.developer_instructions.clone(), - user_instructions: config.user_instructions.clone(), + user_instructions, personality: config.personality, base_instructions: config .base_instructions @@ -5370,6 +5447,7 @@ mod tests { mark_state_initial_context_seeded(&mut state); let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone())); + let file_watcher = Arc::new(FileWatcher::noop()); let services = SessionServices { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), @@ -5388,6 +5466,7 @@ mod tests { models_manager: Arc::clone(&models_manager), tool_approvals: Mutex::new(ApprovalStore::default()), skills_manager, + file_watcher, agent_control, state_db: None, transport_manager: TransportManager::new(), @@ -5428,9 +5507,7 @@ mod tests { async_channel::Receiver, ) { let (tx_event, rx_event) = async_channel::unbounded(); - let codex_home = tempfile::tempdir().expect("create temp dir"); - let config = build_test_config(codex_home.path()).await; - let config = Arc::new(config); + let config = build_trusted_test_config().await; let conversation_id = ThreadId::default(); let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); @@ -5490,6 +5567,7 @@ mod tests { mark_state_initial_context_seeded(&mut state); let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone())); + let file_watcher = Arc::new(FileWatcher::noop()); let services = SessionServices { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), @@ -5508,6 +5586,7 @@ mod tests { models_manager: Arc::clone(&models_manager), tool_approvals: Mutex::new(ApprovalStore::default()), skills_manager, + file_watcher, agent_control, state_db: None, transport_manager: TransportManager::new(), @@ -5678,7 +5757,7 @@ mod tests { } #[tokio::test] - async fn abort_gracefuly_emits_turn_aborted_only() { + async fn abort_gracefully_emits_turn_aborted_only() { let (sess, tc, rx) = make_session_and_context_with_rx().await; let input = vec![UserInput::Text { text: "hello".to_string(), diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 7a611c05e2d..5c5b3855896 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -54,6 +54,7 @@ pub(crate) async fn run_codex_thread_interactive( auth_manager, models_manager, Arc::clone(&parent_session.services.skills_manager), + Arc::clone(&parent_session.services.file_watcher), initial_history.unwrap_or(InitialHistory::New), SessionSource::SubAgent(SubAgentSource::Review), parent_session.services.agent_control.clone(), diff --git a/codex-rs/core/src/file_watcher.rs b/codex-rs/core/src/file_watcher.rs new file mode 100644 index 00000000000..353c4d2d9d5 --- /dev/null +++ b/codex-rs/core/src/file_watcher.rs @@ -0,0 +1,316 @@ +//! Watches skill roots for changes and broadcasts coarse-grained +//! `FileWatcherEvent`s that higher-level components react to on the next turn. + +use std::collections::HashMap; +use std::collections::HashSet; +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::RwLock; +use std::time::Duration; + +use notify::Event; +use notify::RecommendedWatcher; +use notify::RecursiveMode; +use notify::Watcher; +use tokio::runtime::Handle; +use tokio::sync::broadcast; +use tokio::sync::mpsc; +use tokio::time::Instant; +use tokio::time::sleep_until; +use tracing::warn; + +use crate::config::Config; +use crate::skills::loader::skill_roots_from_layer_stack_with_agents; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FileWatcherEvent { + SkillsChanged { paths: Vec }, +} + +struct WatchState { + skills_roots: HashSet, +} + +struct FileWatcherInner { + watcher: RecommendedWatcher, + watched_paths: HashMap, +} + +const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_secs(1); + +/// Coalesces bursts of paths and emits at most once per interval. +struct ThrottledPaths { + pending: HashSet, + next_allowed_at: Instant, +} + +impl ThrottledPaths { + fn new(now: Instant) -> Self { + Self { + pending: HashSet::new(), + next_allowed_at: now, + } + } + + fn add(&mut self, paths: Vec) { + self.pending.extend(paths); + } + + fn next_deadline(&self, now: Instant) -> Option { + (!self.pending.is_empty() && now < self.next_allowed_at).then_some(self.next_allowed_at) + } + + fn take_ready(&mut self, now: Instant) -> Option> { + if self.pending.is_empty() || now < self.next_allowed_at { + return None; + } + Some(self.take_with_next_allowed(now)) + } + + fn take_pending(&mut self, now: Instant) -> Option> { + if self.pending.is_empty() { + return None; + } + Some(self.take_with_next_allowed(now)) + } + + fn take_with_next_allowed(&mut self, now: Instant) -> Vec { + let mut paths: Vec = self.pending.drain().collect(); + paths.sort_unstable_by(|a, b| a.as_os_str().cmp(b.as_os_str())); + self.next_allowed_at = now + WATCHER_THROTTLE_INTERVAL; + paths + } +} + +pub(crate) struct FileWatcher { + inner: Option>, + state: Arc>, + tx: broadcast::Sender, +} + +impl FileWatcher { + pub(crate) fn new(_codex_home: PathBuf) -> notify::Result { + let (raw_tx, raw_rx) = mpsc::unbounded_channel(); + let raw_tx_clone = raw_tx; + let watcher = notify::recommended_watcher(move |res| { + let _ = raw_tx_clone.send(res); + })?; + let inner = FileWatcherInner { + watcher, + watched_paths: HashMap::new(), + }; + let (tx, _) = broadcast::channel(128); + let state = Arc::new(RwLock::new(WatchState { + skills_roots: HashSet::new(), + })); + let file_watcher = Self { + inner: Some(Mutex::new(inner)), + state: Arc::clone(&state), + tx: tx.clone(), + }; + file_watcher.spawn_event_loop(raw_rx, state, tx); + Ok(file_watcher) + } + + pub(crate) fn noop() -> Self { + let (tx, _) = broadcast::channel(1); + Self { + inner: None, + state: Arc::new(RwLock::new(WatchState { + skills_roots: HashSet::new(), + })), + tx, + } + } + + pub(crate) fn subscribe(&self) -> broadcast::Receiver { + self.tx.subscribe() + } + + pub(crate) fn register_config(&self, config: &Config) { + self.register_skills_root(config.codex_home.join("skills")); + let roots = + skill_roots_from_layer_stack_with_agents(&config.config_layer_stack, &config.cwd); + for root in roots { + self.register_skills_root(root.path); + } + } + + // Bridge `notify`'s callback-based events into the Tokio runtime and + // broadcast coarse-grained change signals to subscribers. + fn spawn_event_loop( + &self, + mut raw_rx: mpsc::UnboundedReceiver>, + state: Arc>, + tx: broadcast::Sender, + ) { + if let Ok(handle) = Handle::try_current() { + handle.spawn(async move { + let now = Instant::now(); + let mut skills = ThrottledPaths::new(now); + + loop { + let now = Instant::now(); + let next_deadline = skills.next_deadline(now); + let timer_deadline = next_deadline + .unwrap_or_else(|| now + Duration::from_secs(60 * 60 * 24 * 365)); + let timer = sleep_until(timer_deadline); + tokio::pin!(timer); + + tokio::select! { + res = raw_rx.recv() => { + match res { + Some(Ok(event)) => { + let skills_paths = classify_event(&event, &state); + let now = Instant::now(); + skills.add(skills_paths); + + if let Some(paths) = skills.take_ready(now) { + let _ = tx.send(FileWatcherEvent::SkillsChanged { paths }); + } + } + Some(Err(err)) => { + warn!("file watcher error: {err}"); + } + None => { + // Flush any pending changes before shutdown so subscribers + // see the latest state. + let now = Instant::now(); + if let Some(paths) = skills.take_pending(now) { + let _ = tx.send(FileWatcherEvent::SkillsChanged { paths }); + } + break; + } + } + } + _ = &mut timer => { + let now = Instant::now(); + if let Some(paths) = skills.take_ready(now) { + let _ = tx.send(FileWatcherEvent::SkillsChanged { paths }); + } + } + } + } + }); + } else { + warn!("file watcher loop skipped: no Tokio runtime available"); + } + } + + fn register_skills_root(&self, root: PathBuf) { + { + let mut state = match self.state.write() { + Ok(state) => state, + Err(err) => err.into_inner(), + }; + state.skills_roots.insert(root.clone()); + } + self.watch_path(root, RecursiveMode::Recursive); + } + + fn watch_path(&self, path: PathBuf, mode: RecursiveMode) { + let Some(inner) = &self.inner else { + return; + }; + let Some(watch_path) = nearest_existing_ancestor(&path) else { + return; + }; + let mut guard = match inner.lock() { + Ok(guard) => guard, + Err(err) => err.into_inner(), + }; + if let Some(existing) = guard.watched_paths.get(&watch_path) { + if *existing == RecursiveMode::Recursive || *existing == mode { + return; + } + if let Err(err) = guard.watcher.unwatch(&watch_path) { + warn!("failed to unwatch {}: {err}", watch_path.display()); + } + } + if let Err(err) = guard.watcher.watch(&watch_path, mode) { + warn!("failed to watch {}: {err}", watch_path.display()); + return; + } + guard.watched_paths.insert(watch_path, mode); + } +} + +fn classify_event(event: &Event, state: &RwLock) -> Vec { + let mut skills_paths = Vec::new(); + let skills_roots = match state.read() { + Ok(state) => state.skills_roots.clone(), + Err(err) => { + let state = err.into_inner(); + state.skills_roots.clone() + } + }; + + for path in &event.paths { + if is_skills_path(path, &skills_roots) { + skills_paths.push(path.clone()); + } + } + + skills_paths +} + +fn is_skills_path(path: &Path, roots: &HashSet) -> bool { + roots.iter().any(|root| path.starts_with(root)) +} + +fn nearest_existing_ancestor(path: &Path) -> Option { + let mut cursor = path; + loop { + if cursor.exists() { + return Some(cursor.to_path_buf()); + } + cursor = cursor.parent()?; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + fn path(name: &str) -> PathBuf { + PathBuf::from(name) + } + + #[test] + fn throttles_and_coalesces_within_interval() { + let start = Instant::now(); + let mut throttled = ThrottledPaths::new(start); + + throttled.add(vec![path("a")]); + let first = throttled.take_ready(start).expect("first emit"); + assert_eq!(first, vec![path("a")]); + + throttled.add(vec![path("b"), path("c")]); + assert_eq!(throttled.take_ready(start), None); + + let second = throttled + .take_ready(start + WATCHER_THROTTLE_INTERVAL) + .expect("coalesced emit"); + assert_eq!(second, vec![path("b"), path("c")]); + } + + #[test] + fn flushes_pending_on_shutdown() { + let start = Instant::now(); + let mut throttled = ThrottledPaths::new(start); + + throttled.add(vec![path("a")]); + let _ = throttled.take_ready(start).expect("first emit"); + + throttled.add(vec![path("b")]); + assert_eq!(throttled.take_ready(start), None); + + let flushed = throttled + .take_pending(start) + .expect("shutdown flush emits pending paths"); + assert_eq!(flushed, vec![path("b")]); + } +} diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 1f81f3eab2c..f5325e4ca7e 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -32,6 +32,7 @@ pub mod exec; pub mod exec_env; mod exec_policy; pub mod features; +mod file_watcher; mod flags; pub mod git_info; pub mod instructions; @@ -138,6 +139,7 @@ pub use command_safety::is_safe_command; pub use exec_policy::ExecPolicyError; pub use exec_policy::check_execpolicy_for_warnings; pub use exec_policy::load_exec_policy; +pub use file_watcher::FileWatcherEvent; pub use safety::get_platform_sandbox; pub use tools::spec::parse_tool_input_schema; // Re-export the protocol types from the standalone `codex-protocol` crate so existing diff --git a/codex-rs/core/src/project_doc.rs b/codex-rs/core/src/project_doc.rs index 107477caa82..b1d9ea4d680 100644 --- a/codex-rs/core/src/project_doc.rs +++ b/codex-rs/core/src/project_doc.rs @@ -148,6 +148,31 @@ pub async fn read_project_docs(config: &Config) -> std::io::Result std::io::Result> { + let search_dirs = project_doc_search_dirs(config)?; + let mut found: Vec = Vec::new(); + let candidate_filenames = candidate_filenames(config); + for d in search_dirs { + for name in &candidate_filenames { + let candidate = d.join(name); + match std::fs::symlink_metadata(&candidate) { + Ok(md) => { + let ft = md.file_type(); + // Allow regular files and symlinks; opening will later fail for dangling links. + if ft.is_file() || ft.is_symlink() { + found.push(candidate); + break; + } + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue, + Err(e) => return Err(e), + } + } + } + + Ok(found) +} + +pub(crate) fn project_doc_search_dirs(config: &Config) -> std::io::Result> { let mut dir = config.cwd.clone(); if let Ok(canon) = normalize_path(&dir) { dir = canon; @@ -192,27 +217,7 @@ pub fn discover_project_doc_paths(config: &Config) -> std::io::Result = Vec::new(); - let candidate_filenames = candidate_filenames(config); - for d in search_dirs { - for name in &candidate_filenames { - let candidate = d.join(name); - match std::fs::symlink_metadata(&candidate) { - Ok(md) => { - let ft = md.file_type(); - // Allow regular files and symlinks; opening will later fail for dangling links. - if ft.is_file() || ft.is_symlink() { - found.push(candidate); - break; - } - } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue, - Err(e) => return Err(e), - } - } - } - - Ok(found) + Ok(search_dirs) } fn candidate_filenames<'a>(config: &'a Config) -> Vec<&'a str> { diff --git a/codex-rs/core/src/skills/manager.rs b/codex-rs/core/src/skills/manager.rs index 85e0bf20ebd..61dcd98f525 100644 --- a/codex-rs/core/src/skills/manager.rs +++ b/codex-rs/core/src/skills/manager.rs @@ -6,6 +6,7 @@ use std::sync::RwLock; use codex_utils_absolute_path::AbsolutePathBuf; use toml::Value as TomlValue; +use tracing::info; use tracing::warn; use crate::config::Config; @@ -51,14 +52,11 @@ impl SkillsManager { skill_roots_from_layer_stack_with_agents(&config.config_layer_stack, &config.cwd); let mut outcome = load_skills_from_roots(roots); outcome.disabled_paths = disabled_paths_from_stack(&config.config_layer_stack); - match self.cache_by_cwd.write() { - Ok(mut cache) => { - cache.insert(cwd.to_path_buf(), outcome.clone()); - } - Err(err) => { - err.into_inner().insert(cwd.to_path_buf(), outcome.clone()); - } - } + let mut cache = match self.cache_by_cwd.write() { + Ok(cache) => cache, + Err(err) => err.into_inner(), + }; + cache.insert(cwd.to_path_buf(), outcome.clone()); outcome } @@ -109,22 +107,22 @@ impl SkillsManager { let roots = skill_roots_from_layer_stack_with_agents(&config_layer_stack, cwd); let mut outcome = load_skills_from_roots(roots); outcome.disabled_paths = disabled_paths_from_stack(&config_layer_stack); - match self.cache_by_cwd.write() { - Ok(mut cache) => { - cache.insert(cwd.to_path_buf(), outcome.clone()); - } - Err(err) => { - err.into_inner().insert(cwd.to_path_buf(), outcome.clone()); - } - } + let mut cache = match self.cache_by_cwd.write() { + Ok(cache) => cache, + Err(err) => err.into_inner(), + }; + cache.insert(cwd.to_path_buf(), outcome.clone()); outcome } pub fn clear_cache(&self) { - match self.cache_by_cwd.write() { - Ok(mut cache) => cache.clear(), - Err(err) => err.into_inner().clear(), - } + let mut cache = match self.cache_by_cwd.write() { + Ok(cache) => cache, + Err(err) => err.into_inner(), + }; + let cleared = cache.len(); + cache.clear(); + info!("skills cache cleared ({cleared} entries)"); } } diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index d7788f71cb1..e9a028bfd56 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -5,6 +5,7 @@ use crate::RolloutRecorder; use crate::agent::AgentControl; use crate::analytics_client::AnalyticsEventsClient; use crate::exec_policy::ExecPolicyManager; +use crate::file_watcher::FileWatcher; use crate::mcp_connection_manager::McpConnectionManager; use crate::models_manager::manager::ModelsManager; use crate::skills::SkillsManager; @@ -33,6 +34,7 @@ pub(crate) struct SessionServices { pub(crate) otel_manager: OtelManager, pub(crate) tool_approvals: Mutex, pub(crate) skills_manager: Arc, + pub(crate) file_watcher: Arc, pub(crate) agent_control: AgentControl, pub(crate) state_db: Option, pub(crate) transport_manager: TransportManager, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 37bd0efabcd..e702d9a0043 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -11,6 +11,8 @@ use crate::codex_thread::CodexThread; use crate::config::Config; use crate::error::CodexErr; use crate::error::Result as CodexResult; +use crate::file_watcher::FileWatcher; +use crate::file_watcher::FileWatcherEvent; use crate::models_manager::manager::ModelsManager; use crate::protocol::Event; use crate::protocol::EventMsg; @@ -31,12 +33,56 @@ use std::path::PathBuf; use std::sync::Arc; #[cfg(any(test, feature = "test-support"))] use tempfile::TempDir; +use tokio::runtime::Handle; +#[cfg(any(test, feature = "test-support"))] +use tokio::runtime::RuntimeFlavor; use tokio::sync::RwLock; use tokio::sync::broadcast; use tracing::warn; const THREAD_CREATED_CHANNEL_CAPACITY: usize = 1024; +fn build_file_watcher(codex_home: PathBuf, skills_manager: Arc) -> Arc { + #[cfg(any(test, feature = "test-support"))] + if let Ok(handle) = Handle::try_current() + && handle.runtime_flavor() == RuntimeFlavor::CurrentThread + { + // The real watcher spins background tasks that can starve the + // current-thread test runtime and cause event waits to time out. + // Integration tests compile with the `test-support` feature. + warn!("using noop file watcher under current-thread test runtime"); + return Arc::new(FileWatcher::noop()); + } + + let file_watcher = match FileWatcher::new(codex_home) { + Ok(file_watcher) => Arc::new(file_watcher), + Err(err) => { + warn!("failed to initialize file watcher: {err}"); + Arc::new(FileWatcher::noop()) + } + }; + + let mut rx = file_watcher.subscribe(); + let skills_manager = Arc::clone(&skills_manager); + if let Ok(handle) = Handle::try_current() { + handle.spawn(async move { + loop { + match rx.recv().await { + Ok(FileWatcherEvent::SkillsChanged { .. }) => { + skills_manager.clear_cache(); + } + Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(_)) => continue, + } + } + }); + } else { + warn!("file watcher listener skipped: no Tokio runtime available"); + } + + file_watcher +} + /// Represents a newly created Codex thread (formerly called a conversation), including the first event /// (which is [`EventMsg::SessionConfigured`]). pub struct NewThread { @@ -62,6 +108,7 @@ pub(crate) struct ThreadManagerState { auth_manager: Arc, models_manager: Arc, skills_manager: Arc, + file_watcher: Arc, session_source: SessionSource, #[cfg(any(test, feature = "test-support"))] #[allow(dead_code)] @@ -76,15 +123,15 @@ impl ThreadManager { session_source: SessionSource, ) -> Self { let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY); + let skills_manager = Arc::new(SkillsManager::new(codex_home.clone())); + let file_watcher = build_file_watcher(codex_home.clone(), Arc::clone(&skills_manager)); Self { state: Arc::new(ThreadManagerState { threads: Arc::new(RwLock::new(HashMap::new())), thread_created_tx, - models_manager: Arc::new(ModelsManager::new( - codex_home.clone(), - auth_manager.clone(), - )), - skills_manager: Arc::new(SkillsManager::new(codex_home)), + models_manager: Arc::new(ModelsManager::new(codex_home, auth_manager.clone())), + skills_manager, + file_watcher, auth_manager, session_source, #[cfg(any(test, feature = "test-support"))] @@ -116,16 +163,19 @@ impl ThreadManager { ) -> Self { let auth_manager = AuthManager::from_auth_for_testing(auth); let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY); + let skills_manager = Arc::new(SkillsManager::new(codex_home.clone())); + let file_watcher = build_file_watcher(codex_home.clone(), Arc::clone(&skills_manager)); Self { state: Arc::new(ThreadManagerState { threads: Arc::new(RwLock::new(HashMap::new())), thread_created_tx, models_manager: Arc::new(ModelsManager::with_provider( - codex_home.clone(), + codex_home, auth_manager.clone(), provider, )), - skills_manager: Arc::new(SkillsManager::new(codex_home)), + skills_manager, + file_watcher, auth_manager, session_source: SessionSource::Exec, #[cfg(any(test, feature = "test-support"))] @@ -143,6 +193,10 @@ impl ThreadManager { self.state.skills_manager.clone() } + pub fn subscribe_file_watcher(&self) -> broadcast::Receiver { + self.state.file_watcher.subscribe() + } + pub fn get_models_manager(&self) -> Arc { self.state.models_manager.clone() } @@ -380,6 +434,7 @@ impl ThreadManagerState { session_source: SessionSource, dynamic_tools: Vec, ) -> CodexResult { + self.file_watcher.register_config(&config); let CodexSpawnOk { codex, thread_id, .. } = Codex::spawn( @@ -387,6 +442,7 @@ impl ThreadManagerState { auth_manager, Arc::clone(&self.models_manager), Arc::clone(&self.skills_manager), + Arc::clone(&self.file_watcher), initial_history, session_source, agent_control, diff --git a/codex-rs/core/tests/suite/live_reload.rs b/codex-rs/core/tests/suite/live_reload.rs new file mode 100644 index 00000000000..ca3ed36759e --- /dev/null +++ b/codex-rs/core/tests/suite/live_reload.rs @@ -0,0 +1,163 @@ +#![allow(clippy::expect_used, clippy::unwrap_used)] + +use std::fs; +use std::path::Path; +use std::path::PathBuf; +use std::time::Duration; + +use anyhow::Result; +use codex_core::FileWatcherEvent; +use codex_core::config::ProjectConfig; +use codex_core::protocol::AskForApproval; +use codex_core::protocol::EventMsg; +use codex_core::protocol::Op; +use codex_core::protocol::SandboxPolicy; +use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::config_types::TrustLevel; +use codex_protocol::user_input::UserInput; +use core_test_support::load_sse_fixture_with_id; +use core_test_support::responses::ResponsesRequest; +use core_test_support::responses::mount_sse_sequence; +use core_test_support::responses::start_mock_server; +use core_test_support::test_codex::TestCodex; +use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; +use tokio::time::timeout; + +fn sse_completed(id: &str) -> String { + load_sse_fixture_with_id("../fixtures/completed_template.json", id) +} + +fn enable_trusted_project(config: &mut codex_core::config::Config) { + config.active_project = ProjectConfig { + trust_level: Some(TrustLevel::Trusted), + }; +} + +fn write_skill(home: &Path, name: &str, description: &str, body: &str) -> PathBuf { + let skill_dir = home.join("skills").join(name); + fs::create_dir_all(&skill_dir).expect("create skill dir"); + let contents = format!("---\nname: {name}\ndescription: {description}\n---\n\n{body}\n"); + let path = skill_dir.join("SKILL.md"); + fs::write(&path, contents).expect("write skill"); + path +} + +fn contains_skill_body(request: &ResponsesRequest, skill_body: &str) -> bool { + request + .message_input_texts("user") + .iter() + .any(|text| text.contains(skill_body) && text.contains("")) +} + +async fn submit_skill_turn(test: &TestCodex, skill_path: PathBuf, prompt: &str) -> Result<()> { + let session_model = test.session_configured.model.clone(); + test.codex + .submit(Op::UserTurn { + items: vec![ + UserInput::Text { + text: prompt.to_string(), + text_elements: Vec::new(), + }, + UserInput::Skill { + name: "demo".to_string(), + path: skill_path, + }, + ], + final_output_json_schema: None, + cwd: test.cwd_path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: session_model, + effort: None, + summary: ReasoningSummary::Auto, + collaboration_mode: None, + personality: None, + }) + .await?; + + wait_for_event(test.codex.as_ref(), |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn live_skills_reload_refreshes_skill_cache_after_skill_change() -> Result<()> { + let server = start_mock_server().await; + let responses = mount_sse_sequence( + &server, + vec![sse_completed("resp-1"), sse_completed("resp-2")], + ) + .await; + + let skill_v1 = "skill body v1"; + let skill_v2 = "skill body v2"; + let mut builder = test_codex() + .with_pre_build_hook(move |home| { + write_skill(home, "demo", "demo skill", skill_v1); + }) + .with_config(|config| { + enable_trusted_project(config); + }); + let test = builder.build(&server).await?; + + let skill_path = std::fs::canonicalize(test.codex_home_path().join("skills/demo/SKILL.md"))?; + + submit_skill_turn(&test, skill_path.clone(), "please use $demo").await?; + let first_request = responses + .requests() + .first() + .cloned() + .expect("first request captured"); + assert!( + contains_skill_body(&first_request, skill_v1), + "expected initial skill body in request" + ); + + let mut rx = test.thread_manager.subscribe_file_watcher(); + write_skill(test.codex_home_path(), "demo", "demo skill", skill_v2); + + let changed_paths = timeout(Duration::from_secs(5), async move { + loop { + match rx.recv().await { + Ok(FileWatcherEvent::SkillsChanged { paths }) => break paths, + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + panic!("file watcher channel closed unexpectedly") + } + } + } + }) + .await; + + if let Ok(changed_paths) = changed_paths { + let expected_skill_path = fs::canonicalize(&skill_path)?; + let saw_expected_path = changed_paths + .iter() + .filter_map(|path| fs::canonicalize(path).ok()) + .any(|path| path == expected_skill_path); + assert!( + saw_expected_path, + "expected skill path in watcher event: {changed_paths:?}" + ); + } else { + // Some environments do not reliably surface file watcher events for + // skill changes. Clear the cache explicitly so we can still validate + // that the updated skill body is injected on the next turn. + test.thread_manager.skills_manager().clear_cache(); + } + + submit_skill_turn(&test, skill_path.clone(), "please use $demo again").await?; + let last_request = responses + .last_request() + .expect("request captured after skill update"); + + assert!( + contains_skill_body(&last_request, skill_v2), + "expected updated skill body after reload" + ); + + Ok(()) +} diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index f34ca09b40d..2200ff5a970 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -40,6 +40,7 @@ mod json_result; mod list_dir; mod list_models; mod live_cli; +mod live_reload; mod model_info_overrides; mod model_overrides; mod model_tools; From d594589502fda7298220d60692687c0d1699c484 Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Mon, 2 Feb 2026 23:53:44 -0800 Subject: [PATCH 2/9] Fixed failing tests --- .../schema/json/ServerNotification.json | 23 +++++++++++++++++ .../codex_app_server_protocol.schemas.json | 25 +++++++++++++++++++ .../v2/SkillsListUpdatedNotification.json | 5 ++++ .../schema/typescript/ServerNotification.ts | 3 ++- .../v2/SkillsListUpdatedNotification.ts | 5 ++++ .../schema/typescript/v2/index.ts | 1 + codex-rs/core/src/rollout/truncation.rs | 11 ++++---- 7 files changed, 66 insertions(+), 7 deletions(-) create mode 100644 codex-rs/app-server-protocol/schema/json/v2/SkillsListUpdatedNotification.json create mode 100644 codex-rs/app-server-protocol/schema/typescript/v2/SkillsListUpdatedNotification.ts diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index 5d9e719d8fd..bc614946119 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -5567,6 +5567,9 @@ ], "type": "object" }, + "SkillsListUpdatedNotification": { + "type": "object" + }, "StepStatus": { "enum": [ "pending", @@ -7820,6 +7823,26 @@ "title": "ConfigWarningNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "skills/list/updated" + ], + "title": "Skills/list/updatedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/SkillsListUpdatedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Skills/list/updatedNotification", + "type": "object" + }, { "description": "Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.", "properties": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 5a5ddd10110..8635efa69c7 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -7972,6 +7972,26 @@ "title": "ConfigWarningNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "skills/list/updated" + ], + "title": "Skills/list/updatedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/SkillsListUpdatedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Skills/list/updatedNotification", + "type": "object" + }, { "description": "Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.", "properties": { @@ -13423,6 +13443,11 @@ "title": "SkillsListResponse", "type": "object" }, + "SkillsListUpdatedNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "SkillsListUpdatedNotification", + "type": "object" + }, "SubAgentSource": { "oneOf": [ { diff --git a/codex-rs/app-server-protocol/schema/json/v2/SkillsListUpdatedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/SkillsListUpdatedNotification.json new file mode 100644 index 00000000000..450deb96a46 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/SkillsListUpdatedNotification.json @@ -0,0 +1,5 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "SkillsListUpdatedNotification", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts index 403617fcd4f..d317dd8d24b 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts @@ -23,6 +23,7 @@ import type { RawResponseItemCompletedNotification } from "./v2/RawResponseItemC import type { ReasoningSummaryPartAddedNotification } from "./v2/ReasoningSummaryPartAddedNotification"; import type { ReasoningSummaryTextDeltaNotification } from "./v2/ReasoningSummaryTextDeltaNotification"; import type { ReasoningTextDeltaNotification } from "./v2/ReasoningTextDeltaNotification"; +import type { SkillsListUpdatedNotification } from "./v2/SkillsListUpdatedNotification"; import type { TerminalInteractionNotification } from "./v2/TerminalInteractionNotification"; import type { ThreadNameUpdatedNotification } from "./v2/ThreadNameUpdatedNotification"; import type { ThreadStartedNotification } from "./v2/ThreadStartedNotification"; @@ -36,4 +37,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW /** * Notification sent from the server to the client. */ -export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification }; +export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "skills/list/updated", "params": SkillsListUpdatedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/SkillsListUpdatedNotification.ts b/codex-rs/app-server-protocol/schema/typescript/v2/SkillsListUpdatedNotification.ts new file mode 100644 index 00000000000..6213a9c8fd6 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/SkillsListUpdatedNotification.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type SkillsListUpdatedNotification = Record; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index 10b60a45fd7..ed1bacebfb7 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -116,6 +116,7 @@ export type { SkillsConfigWriteResponse } from "./SkillsConfigWriteResponse"; export type { SkillsListEntry } from "./SkillsListEntry"; export type { SkillsListParams } from "./SkillsListParams"; export type { SkillsListResponse } from "./SkillsListResponse"; +export type { SkillsListUpdatedNotification } from "./SkillsListUpdatedNotification"; export type { TerminalInteractionNotification } from "./TerminalInteractionNotification"; export type { TextElement } from "./TextElement"; export type { TextPosition } from "./TextPosition"; diff --git a/codex-rs/core/src/rollout/truncation.rs b/codex-rs/core/src/rollout/truncation.rs index c50eacc48bd..8976fce1e38 100644 --- a/codex-rs/core/src/rollout/truncation.rs +++ b/codex-rs/core/src/rollout/truncation.rs @@ -206,12 +206,11 @@ mod tests { .collect(); let truncated = truncate_rollout_before_nth_user_message_from_start(&rollout_items, 1); - let expected: Vec = vec![ - RolloutItem::ResponseItem(items[0].clone()), - RolloutItem::ResponseItem(items[1].clone()), - RolloutItem::ResponseItem(items[2].clone()), - RolloutItem::ResponseItem(items[3].clone()), - ]; + let expected: Vec = items[..items.len() - 2] + .iter() + .cloned() + .map(RolloutItem::ResponseItem) + .collect(); assert_eq!( serde_json::to_value(&truncated).unwrap(), From 4cce84a6db01d0e52359d10b7cf5d73242dada11 Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Tue, 3 Feb 2026 00:21:14 -0800 Subject: [PATCH 3/9] Simplified fix --- codex-rs/Cargo.lock | 1 + .../schema/json/ServerNotification.json | 23 ++ .../codex_app_server_protocol.schemas.json | 25 ++ .../v2/SkillsListUpdatedNotification.json | 5 + .../schema/typescript/ServerNotification.ts | 3 +- .../v2/SkillsListUpdatedNotification.ts | 5 + .../schema/typescript/v2/index.ts | 1 + .../src/protocol/common.rs | 1 + .../app-server-protocol/src/protocol/v2.rs | 5 + codex-rs/app-server/src/message_processor.rs | 29 ++ codex-rs/core/Cargo.toml | 1 + codex-rs/core/src/codex.rs | 44 ++- codex-rs/core/src/codex_delegate.rs | 1 + codex-rs/core/src/file_watcher.rs | 316 ++++++++++++++++++ codex-rs/core/src/lib.rs | 2 + codex-rs/core/src/skills/manager.rs | 38 +-- codex-rs/core/src/state/service.rs | 2 + codex-rs/core/src/thread_manager.rs | 70 +++- codex-rs/core/tests/suite/live_reload.rs | 163 +++++++++ codex-rs/core/tests/suite/mod.rs | 1 + 20 files changed, 707 insertions(+), 29 deletions(-) create mode 100644 codex-rs/app-server-protocol/schema/json/v2/SkillsListUpdatedNotification.json create mode 100644 codex-rs/app-server-protocol/schema/typescript/v2/SkillsListUpdatedNotification.ts create mode 100644 codex-rs/core/src/file_watcher.rs create mode 100644 codex-rs/core/tests/suite/live_reload.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 40a95cc3337..d280c597423 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1436,6 +1436,7 @@ dependencies = [ "libc", "maplit", "multimap", + "notify", "once_cell", "openssl-sys", "opentelemetry_sdk", diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index 5d9e719d8fd..bc614946119 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -5567,6 +5567,9 @@ ], "type": "object" }, + "SkillsListUpdatedNotification": { + "type": "object" + }, "StepStatus": { "enum": [ "pending", @@ -7820,6 +7823,26 @@ "title": "ConfigWarningNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "skills/list/updated" + ], + "title": "Skills/list/updatedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/SkillsListUpdatedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Skills/list/updatedNotification", + "type": "object" + }, { "description": "Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.", "properties": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 5a5ddd10110..8635efa69c7 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -7972,6 +7972,26 @@ "title": "ConfigWarningNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "skills/list/updated" + ], + "title": "Skills/list/updatedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/SkillsListUpdatedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Skills/list/updatedNotification", + "type": "object" + }, { "description": "Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.", "properties": { @@ -13423,6 +13443,11 @@ "title": "SkillsListResponse", "type": "object" }, + "SkillsListUpdatedNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "SkillsListUpdatedNotification", + "type": "object" + }, "SubAgentSource": { "oneOf": [ { diff --git a/codex-rs/app-server-protocol/schema/json/v2/SkillsListUpdatedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/SkillsListUpdatedNotification.json new file mode 100644 index 00000000000..450deb96a46 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/SkillsListUpdatedNotification.json @@ -0,0 +1,5 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "SkillsListUpdatedNotification", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts index 403617fcd4f..d317dd8d24b 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts @@ -23,6 +23,7 @@ import type { RawResponseItemCompletedNotification } from "./v2/RawResponseItemC import type { ReasoningSummaryPartAddedNotification } from "./v2/ReasoningSummaryPartAddedNotification"; import type { ReasoningSummaryTextDeltaNotification } from "./v2/ReasoningSummaryTextDeltaNotification"; import type { ReasoningTextDeltaNotification } from "./v2/ReasoningTextDeltaNotification"; +import type { SkillsListUpdatedNotification } from "./v2/SkillsListUpdatedNotification"; import type { TerminalInteractionNotification } from "./v2/TerminalInteractionNotification"; import type { ThreadNameUpdatedNotification } from "./v2/ThreadNameUpdatedNotification"; import type { ThreadStartedNotification } from "./v2/ThreadStartedNotification"; @@ -36,4 +37,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW /** * Notification sent from the server to the client. */ -export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification }; +export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "skills/list/updated", "params": SkillsListUpdatedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/SkillsListUpdatedNotification.ts b/codex-rs/app-server-protocol/schema/typescript/v2/SkillsListUpdatedNotification.ts new file mode 100644 index 00000000000..6213a9c8fd6 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/SkillsListUpdatedNotification.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type SkillsListUpdatedNotification = Record; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index 10b60a45fd7..ed1bacebfb7 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -116,6 +116,7 @@ export type { SkillsConfigWriteResponse } from "./SkillsConfigWriteResponse"; export type { SkillsListEntry } from "./SkillsListEntry"; export type { SkillsListParams } from "./SkillsListParams"; export type { SkillsListResponse } from "./SkillsListResponse"; +export type { SkillsListUpdatedNotification } from "./SkillsListUpdatedNotification"; export type { TerminalInteractionNotification } from "./TerminalInteractionNotification"; export type { TextElement } from "./TextElement"; export type { TextPosition } from "./TextPosition"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index d20f23a88a5..f6196d8823e 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -704,6 +704,7 @@ server_notification_definitions! { ContextCompacted => "thread/compacted" (v2::ContextCompactedNotification), DeprecationNotice => "deprecationNotice" (v2::DeprecationNoticeNotification), ConfigWarning => "configWarning" (v2::ConfigWarningNotification), + SkillsListUpdated => "skills/list/updated" (v2::SkillsListUpdatedNotification), /// Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox. WindowsWorldWritableWarning => "windows/worldWritableWarning" (v2::WindowsWorldWritableWarningNotification), diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 19db13723af..aa419abf30d 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -2635,6 +2635,11 @@ pub struct ContextCompactedNotification { pub turn_id: String, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct SkillsListUpdatedNotification {} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index db6275dcaff..c05ccd62f06 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -28,8 +28,10 @@ use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequestPayload; +use codex_app_server_protocol::SkillsListUpdatedNotification; use codex_app_server_protocol::experimental_required_message; use codex_core::AuthManager; +use codex_core::FileWatcherEvent; use codex_core::ThreadManager; use codex_core::auth::ExternalAuthRefreshContext; use codex_core::auth::ExternalAuthRefreshReason; @@ -112,6 +114,7 @@ pub(crate) struct MessageProcessor { config: Arc, initialized: bool, experimental_api_enabled: Arc, + initialized_flag: Arc, config_warnings: Vec, } @@ -156,6 +159,30 @@ impl MessageProcessor { auth_manager.clone(), SessionSource::VSCode, )); + + // Watch for on-disk skill changes and send notifications to the client. + let initialized_flag = Arc::new(AtomicBool::new(false)); + let mut skills_updates_rx = thread_manager.subscribe_file_watcher(); + let outgoing_for_skills = Arc::clone(&outgoing); + let initialized_for_skills = Arc::clone(&initialized_flag); + tokio::spawn(async move { + loop { + match skills_updates_rx.recv().await { + Ok(FileWatcherEvent::SkillsChanged { .. }) => { + if !initialized_for_skills.load(Ordering::SeqCst) { + continue; + } + outgoing_for_skills + .send_server_notification(ServerNotification::SkillsListUpdated( + SkillsListUpdatedNotification {}, + )) + .await; + } + Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(_)) => continue, + } + } + }); let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs { auth_manager, thread_manager, @@ -180,6 +207,7 @@ impl MessageProcessor { config, initialized: false, experimental_api_enabled, + initialized_flag, config_warnings, } } @@ -268,6 +296,7 @@ impl MessageProcessor { self.outgoing.send_response(request_id, response).await; self.initialized = true; + self.initialized_flag.store(true, Ordering::SeqCst); if !self.config_warnings.is_empty() { for notification in self.config_warnings.drain(..) { self.outgoing diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index 38da38b929f..45341087f27 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -58,6 +58,7 @@ indoc = { workspace = true } keyring = { workspace = true, features = ["crypto-rust"] } libc = { workspace = true } multimap = { workspace = true } +notify = { workspace = true } once_cell = { workspace = true } os_info = { workspace = true } rand = { workspace = true } diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index ba84b6b4321..8563768c7c5 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -118,6 +118,8 @@ use crate::error::Result as CodexResult; use crate::exec::StreamOutput; use crate::exec_policy::ExecPolicyUpdateError; use crate::feedback_tags; +use crate::file_watcher::FileWatcher; +use crate::file_watcher::FileWatcherEvent; use crate::git_info::get_git_repo_root; use crate::instructions::UserInstructions; use crate::mcp::CODEX_APPS_MCP_SERVER_NAME; @@ -277,6 +279,7 @@ impl Codex { auth_manager: Arc, models_manager: Arc, skills_manager: Arc, + file_watcher: Arc, conversation_history: InitialHistory, session_source: SessionSource, agent_control: AgentControl, @@ -413,6 +416,7 @@ impl Codex { conversation_history, session_source_clone, skills_manager, + file_watcher, agent_control, ) .instrument(session_init_span) @@ -675,6 +679,29 @@ impl Session { state.session_configuration.codex_home().clone() } + fn start_file_watcher_listener(self: &Arc) { + let mut rx = self.services.file_watcher.subscribe(); + let weak_sess = Arc::downgrade(self); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(FileWatcherEvent::SkillsChanged { .. }) => { + let Some(sess) = weak_sess.upgrade() else { + break; + }; + let event = Event { + id: sess.next_internal_sub_id(), + msg: EventMsg::SkillsUpdateAvailable, + }; + sess.send_event_raw(event).await; + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + } + } + }); + } + #[allow(clippy::too_many_arguments)] fn make_turn_context( auth_manager: Option>, @@ -746,6 +773,7 @@ impl Session { initial_history: InitialHistory, session_source: SessionSource, skills_manager: Arc, + file_watcher: Arc, agent_control: AgentControl, ) -> anyhow::Result> { debug!( @@ -946,6 +974,7 @@ impl Session { models_manager: Arc::clone(&models_manager), tool_approvals: Mutex::new(ApprovalStore::default()), skills_manager, + file_watcher, agent_control, state_db: state_db_ctx.clone(), transport_manager: TransportManager::new(), @@ -989,6 +1018,9 @@ impl Session { sess.send_event_raw(event).await; } + // Start the watcher after SessionConfigured so it cannot emit earlier events. + sess.start_file_watcher_listener(); + // Construct sandbox_state before initialize() so it can be sent to each // MCP server immediately after it becomes ready (avoiding blocking). let sandbox_state = SandboxState { @@ -4501,9 +4533,12 @@ pub(crate) use tests::make_session_and_context_with_rx; #[cfg(test)] mod tests { + use super::*; use crate::CodexAuth; + use crate::config::ConfigBuilder; + use crate::config::test_config; use crate::exec::ExecToolCallOutput; use crate::function_tool::FunctionCallError; @@ -5320,6 +5355,7 @@ mod tests { )); let agent_control = AgentControl::default(); let exec_policy = ExecPolicyManager::default(); + let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone())); let (agent_status_tx, _agent_status_rx) = watch::channel(AgentStatus::PendingInit); let model = ModelsManager::get_model_offline(config.model.as_deref()); let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config); @@ -5332,6 +5368,8 @@ mod tests { developer_instructions: None, }, }; + let skills_outcome = skills_manager.skills_for_config(config.as_ref()); + let _enabled_skills = skills_outcome.enabled_skills(); let session_configuration = SessionConfiguration { provider: config.model_provider.clone(), collaboration_mode, @@ -5370,6 +5408,7 @@ mod tests { mark_state_initial_context_seeded(&mut state); let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone())); + let file_watcher = Arc::new(FileWatcher::noop()); let services = SessionServices { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), @@ -5388,6 +5427,7 @@ mod tests { models_manager: Arc::clone(&models_manager), tool_approvals: Mutex::new(ApprovalStore::default()), skills_manager, + file_watcher, agent_control, state_db: None, transport_manager: TransportManager::new(), @@ -5490,6 +5530,7 @@ mod tests { mark_state_initial_context_seeded(&mut state); let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone())); + let file_watcher = Arc::new(FileWatcher::noop()); let services = SessionServices { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), @@ -5508,6 +5549,7 @@ mod tests { models_manager: Arc::clone(&models_manager), tool_approvals: Mutex::new(ApprovalStore::default()), skills_manager, + file_watcher, agent_control, state_db: None, transport_manager: TransportManager::new(), @@ -5678,7 +5720,7 @@ mod tests { } #[tokio::test] - async fn abort_gracefuly_emits_turn_aborted_only() { + async fn abort_gracefully_emits_turn_aborted_only() { let (sess, tc, rx) = make_session_and_context_with_rx().await; let input = vec![UserInput::Text { text: "hello".to_string(), diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 7a611c05e2d..5c5b3855896 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -54,6 +54,7 @@ pub(crate) async fn run_codex_thread_interactive( auth_manager, models_manager, Arc::clone(&parent_session.services.skills_manager), + Arc::clone(&parent_session.services.file_watcher), initial_history.unwrap_or(InitialHistory::New), SessionSource::SubAgent(SubAgentSource::Review), parent_session.services.agent_control.clone(), diff --git a/codex-rs/core/src/file_watcher.rs b/codex-rs/core/src/file_watcher.rs new file mode 100644 index 00000000000..353c4d2d9d5 --- /dev/null +++ b/codex-rs/core/src/file_watcher.rs @@ -0,0 +1,316 @@ +//! Watches skill roots for changes and broadcasts coarse-grained +//! `FileWatcherEvent`s that higher-level components react to on the next turn. + +use std::collections::HashMap; +use std::collections::HashSet; +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::RwLock; +use std::time::Duration; + +use notify::Event; +use notify::RecommendedWatcher; +use notify::RecursiveMode; +use notify::Watcher; +use tokio::runtime::Handle; +use tokio::sync::broadcast; +use tokio::sync::mpsc; +use tokio::time::Instant; +use tokio::time::sleep_until; +use tracing::warn; + +use crate::config::Config; +use crate::skills::loader::skill_roots_from_layer_stack_with_agents; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FileWatcherEvent { + SkillsChanged { paths: Vec }, +} + +struct WatchState { + skills_roots: HashSet, +} + +struct FileWatcherInner { + watcher: RecommendedWatcher, + watched_paths: HashMap, +} + +const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_secs(1); + +/// Coalesces bursts of paths and emits at most once per interval. +struct ThrottledPaths { + pending: HashSet, + next_allowed_at: Instant, +} + +impl ThrottledPaths { + fn new(now: Instant) -> Self { + Self { + pending: HashSet::new(), + next_allowed_at: now, + } + } + + fn add(&mut self, paths: Vec) { + self.pending.extend(paths); + } + + fn next_deadline(&self, now: Instant) -> Option { + (!self.pending.is_empty() && now < self.next_allowed_at).then_some(self.next_allowed_at) + } + + fn take_ready(&mut self, now: Instant) -> Option> { + if self.pending.is_empty() || now < self.next_allowed_at { + return None; + } + Some(self.take_with_next_allowed(now)) + } + + fn take_pending(&mut self, now: Instant) -> Option> { + if self.pending.is_empty() { + return None; + } + Some(self.take_with_next_allowed(now)) + } + + fn take_with_next_allowed(&mut self, now: Instant) -> Vec { + let mut paths: Vec = self.pending.drain().collect(); + paths.sort_unstable_by(|a, b| a.as_os_str().cmp(b.as_os_str())); + self.next_allowed_at = now + WATCHER_THROTTLE_INTERVAL; + paths + } +} + +pub(crate) struct FileWatcher { + inner: Option>, + state: Arc>, + tx: broadcast::Sender, +} + +impl FileWatcher { + pub(crate) fn new(_codex_home: PathBuf) -> notify::Result { + let (raw_tx, raw_rx) = mpsc::unbounded_channel(); + let raw_tx_clone = raw_tx; + let watcher = notify::recommended_watcher(move |res| { + let _ = raw_tx_clone.send(res); + })?; + let inner = FileWatcherInner { + watcher, + watched_paths: HashMap::new(), + }; + let (tx, _) = broadcast::channel(128); + let state = Arc::new(RwLock::new(WatchState { + skills_roots: HashSet::new(), + })); + let file_watcher = Self { + inner: Some(Mutex::new(inner)), + state: Arc::clone(&state), + tx: tx.clone(), + }; + file_watcher.spawn_event_loop(raw_rx, state, tx); + Ok(file_watcher) + } + + pub(crate) fn noop() -> Self { + let (tx, _) = broadcast::channel(1); + Self { + inner: None, + state: Arc::new(RwLock::new(WatchState { + skills_roots: HashSet::new(), + })), + tx, + } + } + + pub(crate) fn subscribe(&self) -> broadcast::Receiver { + self.tx.subscribe() + } + + pub(crate) fn register_config(&self, config: &Config) { + self.register_skills_root(config.codex_home.join("skills")); + let roots = + skill_roots_from_layer_stack_with_agents(&config.config_layer_stack, &config.cwd); + for root in roots { + self.register_skills_root(root.path); + } + } + + // Bridge `notify`'s callback-based events into the Tokio runtime and + // broadcast coarse-grained change signals to subscribers. + fn spawn_event_loop( + &self, + mut raw_rx: mpsc::UnboundedReceiver>, + state: Arc>, + tx: broadcast::Sender, + ) { + if let Ok(handle) = Handle::try_current() { + handle.spawn(async move { + let now = Instant::now(); + let mut skills = ThrottledPaths::new(now); + + loop { + let now = Instant::now(); + let next_deadline = skills.next_deadline(now); + let timer_deadline = next_deadline + .unwrap_or_else(|| now + Duration::from_secs(60 * 60 * 24 * 365)); + let timer = sleep_until(timer_deadline); + tokio::pin!(timer); + + tokio::select! { + res = raw_rx.recv() => { + match res { + Some(Ok(event)) => { + let skills_paths = classify_event(&event, &state); + let now = Instant::now(); + skills.add(skills_paths); + + if let Some(paths) = skills.take_ready(now) { + let _ = tx.send(FileWatcherEvent::SkillsChanged { paths }); + } + } + Some(Err(err)) => { + warn!("file watcher error: {err}"); + } + None => { + // Flush any pending changes before shutdown so subscribers + // see the latest state. + let now = Instant::now(); + if let Some(paths) = skills.take_pending(now) { + let _ = tx.send(FileWatcherEvent::SkillsChanged { paths }); + } + break; + } + } + } + _ = &mut timer => { + let now = Instant::now(); + if let Some(paths) = skills.take_ready(now) { + let _ = tx.send(FileWatcherEvent::SkillsChanged { paths }); + } + } + } + } + }); + } else { + warn!("file watcher loop skipped: no Tokio runtime available"); + } + } + + fn register_skills_root(&self, root: PathBuf) { + { + let mut state = match self.state.write() { + Ok(state) => state, + Err(err) => err.into_inner(), + }; + state.skills_roots.insert(root.clone()); + } + self.watch_path(root, RecursiveMode::Recursive); + } + + fn watch_path(&self, path: PathBuf, mode: RecursiveMode) { + let Some(inner) = &self.inner else { + return; + }; + let Some(watch_path) = nearest_existing_ancestor(&path) else { + return; + }; + let mut guard = match inner.lock() { + Ok(guard) => guard, + Err(err) => err.into_inner(), + }; + if let Some(existing) = guard.watched_paths.get(&watch_path) { + if *existing == RecursiveMode::Recursive || *existing == mode { + return; + } + if let Err(err) = guard.watcher.unwatch(&watch_path) { + warn!("failed to unwatch {}: {err}", watch_path.display()); + } + } + if let Err(err) = guard.watcher.watch(&watch_path, mode) { + warn!("failed to watch {}: {err}", watch_path.display()); + return; + } + guard.watched_paths.insert(watch_path, mode); + } +} + +fn classify_event(event: &Event, state: &RwLock) -> Vec { + let mut skills_paths = Vec::new(); + let skills_roots = match state.read() { + Ok(state) => state.skills_roots.clone(), + Err(err) => { + let state = err.into_inner(); + state.skills_roots.clone() + } + }; + + for path in &event.paths { + if is_skills_path(path, &skills_roots) { + skills_paths.push(path.clone()); + } + } + + skills_paths +} + +fn is_skills_path(path: &Path, roots: &HashSet) -> bool { + roots.iter().any(|root| path.starts_with(root)) +} + +fn nearest_existing_ancestor(path: &Path) -> Option { + let mut cursor = path; + loop { + if cursor.exists() { + return Some(cursor.to_path_buf()); + } + cursor = cursor.parent()?; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + fn path(name: &str) -> PathBuf { + PathBuf::from(name) + } + + #[test] + fn throttles_and_coalesces_within_interval() { + let start = Instant::now(); + let mut throttled = ThrottledPaths::new(start); + + throttled.add(vec![path("a")]); + let first = throttled.take_ready(start).expect("first emit"); + assert_eq!(first, vec![path("a")]); + + throttled.add(vec![path("b"), path("c")]); + assert_eq!(throttled.take_ready(start), None); + + let second = throttled + .take_ready(start + WATCHER_THROTTLE_INTERVAL) + .expect("coalesced emit"); + assert_eq!(second, vec![path("b"), path("c")]); + } + + #[test] + fn flushes_pending_on_shutdown() { + let start = Instant::now(); + let mut throttled = ThrottledPaths::new(start); + + throttled.add(vec![path("a")]); + let _ = throttled.take_ready(start).expect("first emit"); + + throttled.add(vec![path("b")]); + assert_eq!(throttled.take_ready(start), None); + + let flushed = throttled + .take_pending(start) + .expect("shutdown flush emits pending paths"); + assert_eq!(flushed, vec![path("b")]); + } +} diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 1f81f3eab2c..f5325e4ca7e 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -32,6 +32,7 @@ pub mod exec; pub mod exec_env; mod exec_policy; pub mod features; +mod file_watcher; mod flags; pub mod git_info; pub mod instructions; @@ -138,6 +139,7 @@ pub use command_safety::is_safe_command; pub use exec_policy::ExecPolicyError; pub use exec_policy::check_execpolicy_for_warnings; pub use exec_policy::load_exec_policy; +pub use file_watcher::FileWatcherEvent; pub use safety::get_platform_sandbox; pub use tools::spec::parse_tool_input_schema; // Re-export the protocol types from the standalone `codex-protocol` crate so existing diff --git a/codex-rs/core/src/skills/manager.rs b/codex-rs/core/src/skills/manager.rs index 85e0bf20ebd..61dcd98f525 100644 --- a/codex-rs/core/src/skills/manager.rs +++ b/codex-rs/core/src/skills/manager.rs @@ -6,6 +6,7 @@ use std::sync::RwLock; use codex_utils_absolute_path::AbsolutePathBuf; use toml::Value as TomlValue; +use tracing::info; use tracing::warn; use crate::config::Config; @@ -51,14 +52,11 @@ impl SkillsManager { skill_roots_from_layer_stack_with_agents(&config.config_layer_stack, &config.cwd); let mut outcome = load_skills_from_roots(roots); outcome.disabled_paths = disabled_paths_from_stack(&config.config_layer_stack); - match self.cache_by_cwd.write() { - Ok(mut cache) => { - cache.insert(cwd.to_path_buf(), outcome.clone()); - } - Err(err) => { - err.into_inner().insert(cwd.to_path_buf(), outcome.clone()); - } - } + let mut cache = match self.cache_by_cwd.write() { + Ok(cache) => cache, + Err(err) => err.into_inner(), + }; + cache.insert(cwd.to_path_buf(), outcome.clone()); outcome } @@ -109,22 +107,22 @@ impl SkillsManager { let roots = skill_roots_from_layer_stack_with_agents(&config_layer_stack, cwd); let mut outcome = load_skills_from_roots(roots); outcome.disabled_paths = disabled_paths_from_stack(&config_layer_stack); - match self.cache_by_cwd.write() { - Ok(mut cache) => { - cache.insert(cwd.to_path_buf(), outcome.clone()); - } - Err(err) => { - err.into_inner().insert(cwd.to_path_buf(), outcome.clone()); - } - } + let mut cache = match self.cache_by_cwd.write() { + Ok(cache) => cache, + Err(err) => err.into_inner(), + }; + cache.insert(cwd.to_path_buf(), outcome.clone()); outcome } pub fn clear_cache(&self) { - match self.cache_by_cwd.write() { - Ok(mut cache) => cache.clear(), - Err(err) => err.into_inner().clear(), - } + let mut cache = match self.cache_by_cwd.write() { + Ok(cache) => cache, + Err(err) => err.into_inner(), + }; + let cleared = cache.len(); + cache.clear(); + info!("skills cache cleared ({cleared} entries)"); } } diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index d7788f71cb1..e9a028bfd56 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -5,6 +5,7 @@ use crate::RolloutRecorder; use crate::agent::AgentControl; use crate::analytics_client::AnalyticsEventsClient; use crate::exec_policy::ExecPolicyManager; +use crate::file_watcher::FileWatcher; use crate::mcp_connection_manager::McpConnectionManager; use crate::models_manager::manager::ModelsManager; use crate::skills::SkillsManager; @@ -33,6 +34,7 @@ pub(crate) struct SessionServices { pub(crate) otel_manager: OtelManager, pub(crate) tool_approvals: Mutex, pub(crate) skills_manager: Arc, + pub(crate) file_watcher: Arc, pub(crate) agent_control: AgentControl, pub(crate) state_db: Option, pub(crate) transport_manager: TransportManager, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 37bd0efabcd..e702d9a0043 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -11,6 +11,8 @@ use crate::codex_thread::CodexThread; use crate::config::Config; use crate::error::CodexErr; use crate::error::Result as CodexResult; +use crate::file_watcher::FileWatcher; +use crate::file_watcher::FileWatcherEvent; use crate::models_manager::manager::ModelsManager; use crate::protocol::Event; use crate::protocol::EventMsg; @@ -31,12 +33,56 @@ use std::path::PathBuf; use std::sync::Arc; #[cfg(any(test, feature = "test-support"))] use tempfile::TempDir; +use tokio::runtime::Handle; +#[cfg(any(test, feature = "test-support"))] +use tokio::runtime::RuntimeFlavor; use tokio::sync::RwLock; use tokio::sync::broadcast; use tracing::warn; const THREAD_CREATED_CHANNEL_CAPACITY: usize = 1024; +fn build_file_watcher(codex_home: PathBuf, skills_manager: Arc) -> Arc { + #[cfg(any(test, feature = "test-support"))] + if let Ok(handle) = Handle::try_current() + && handle.runtime_flavor() == RuntimeFlavor::CurrentThread + { + // The real watcher spins background tasks that can starve the + // current-thread test runtime and cause event waits to time out. + // Integration tests compile with the `test-support` feature. + warn!("using noop file watcher under current-thread test runtime"); + return Arc::new(FileWatcher::noop()); + } + + let file_watcher = match FileWatcher::new(codex_home) { + Ok(file_watcher) => Arc::new(file_watcher), + Err(err) => { + warn!("failed to initialize file watcher: {err}"); + Arc::new(FileWatcher::noop()) + } + }; + + let mut rx = file_watcher.subscribe(); + let skills_manager = Arc::clone(&skills_manager); + if let Ok(handle) = Handle::try_current() { + handle.spawn(async move { + loop { + match rx.recv().await { + Ok(FileWatcherEvent::SkillsChanged { .. }) => { + skills_manager.clear_cache(); + } + Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(_)) => continue, + } + } + }); + } else { + warn!("file watcher listener skipped: no Tokio runtime available"); + } + + file_watcher +} + /// Represents a newly created Codex thread (formerly called a conversation), including the first event /// (which is [`EventMsg::SessionConfigured`]). pub struct NewThread { @@ -62,6 +108,7 @@ pub(crate) struct ThreadManagerState { auth_manager: Arc, models_manager: Arc, skills_manager: Arc, + file_watcher: Arc, session_source: SessionSource, #[cfg(any(test, feature = "test-support"))] #[allow(dead_code)] @@ -76,15 +123,15 @@ impl ThreadManager { session_source: SessionSource, ) -> Self { let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY); + let skills_manager = Arc::new(SkillsManager::new(codex_home.clone())); + let file_watcher = build_file_watcher(codex_home.clone(), Arc::clone(&skills_manager)); Self { state: Arc::new(ThreadManagerState { threads: Arc::new(RwLock::new(HashMap::new())), thread_created_tx, - models_manager: Arc::new(ModelsManager::new( - codex_home.clone(), - auth_manager.clone(), - )), - skills_manager: Arc::new(SkillsManager::new(codex_home)), + models_manager: Arc::new(ModelsManager::new(codex_home, auth_manager.clone())), + skills_manager, + file_watcher, auth_manager, session_source, #[cfg(any(test, feature = "test-support"))] @@ -116,16 +163,19 @@ impl ThreadManager { ) -> Self { let auth_manager = AuthManager::from_auth_for_testing(auth); let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY); + let skills_manager = Arc::new(SkillsManager::new(codex_home.clone())); + let file_watcher = build_file_watcher(codex_home.clone(), Arc::clone(&skills_manager)); Self { state: Arc::new(ThreadManagerState { threads: Arc::new(RwLock::new(HashMap::new())), thread_created_tx, models_manager: Arc::new(ModelsManager::with_provider( - codex_home.clone(), + codex_home, auth_manager.clone(), provider, )), - skills_manager: Arc::new(SkillsManager::new(codex_home)), + skills_manager, + file_watcher, auth_manager, session_source: SessionSource::Exec, #[cfg(any(test, feature = "test-support"))] @@ -143,6 +193,10 @@ impl ThreadManager { self.state.skills_manager.clone() } + pub fn subscribe_file_watcher(&self) -> broadcast::Receiver { + self.state.file_watcher.subscribe() + } + pub fn get_models_manager(&self) -> Arc { self.state.models_manager.clone() } @@ -380,6 +434,7 @@ impl ThreadManagerState { session_source: SessionSource, dynamic_tools: Vec, ) -> CodexResult { + self.file_watcher.register_config(&config); let CodexSpawnOk { codex, thread_id, .. } = Codex::spawn( @@ -387,6 +442,7 @@ impl ThreadManagerState { auth_manager, Arc::clone(&self.models_manager), Arc::clone(&self.skills_manager), + Arc::clone(&self.file_watcher), initial_history, session_source, agent_control, diff --git a/codex-rs/core/tests/suite/live_reload.rs b/codex-rs/core/tests/suite/live_reload.rs new file mode 100644 index 00000000000..ca3ed36759e --- /dev/null +++ b/codex-rs/core/tests/suite/live_reload.rs @@ -0,0 +1,163 @@ +#![allow(clippy::expect_used, clippy::unwrap_used)] + +use std::fs; +use std::path::Path; +use std::path::PathBuf; +use std::time::Duration; + +use anyhow::Result; +use codex_core::FileWatcherEvent; +use codex_core::config::ProjectConfig; +use codex_core::protocol::AskForApproval; +use codex_core::protocol::EventMsg; +use codex_core::protocol::Op; +use codex_core::protocol::SandboxPolicy; +use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::config_types::TrustLevel; +use codex_protocol::user_input::UserInput; +use core_test_support::load_sse_fixture_with_id; +use core_test_support::responses::ResponsesRequest; +use core_test_support::responses::mount_sse_sequence; +use core_test_support::responses::start_mock_server; +use core_test_support::test_codex::TestCodex; +use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; +use tokio::time::timeout; + +fn sse_completed(id: &str) -> String { + load_sse_fixture_with_id("../fixtures/completed_template.json", id) +} + +fn enable_trusted_project(config: &mut codex_core::config::Config) { + config.active_project = ProjectConfig { + trust_level: Some(TrustLevel::Trusted), + }; +} + +fn write_skill(home: &Path, name: &str, description: &str, body: &str) -> PathBuf { + let skill_dir = home.join("skills").join(name); + fs::create_dir_all(&skill_dir).expect("create skill dir"); + let contents = format!("---\nname: {name}\ndescription: {description}\n---\n\n{body}\n"); + let path = skill_dir.join("SKILL.md"); + fs::write(&path, contents).expect("write skill"); + path +} + +fn contains_skill_body(request: &ResponsesRequest, skill_body: &str) -> bool { + request + .message_input_texts("user") + .iter() + .any(|text| text.contains(skill_body) && text.contains("")) +} + +async fn submit_skill_turn(test: &TestCodex, skill_path: PathBuf, prompt: &str) -> Result<()> { + let session_model = test.session_configured.model.clone(); + test.codex + .submit(Op::UserTurn { + items: vec![ + UserInput::Text { + text: prompt.to_string(), + text_elements: Vec::new(), + }, + UserInput::Skill { + name: "demo".to_string(), + path: skill_path, + }, + ], + final_output_json_schema: None, + cwd: test.cwd_path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: session_model, + effort: None, + summary: ReasoningSummary::Auto, + collaboration_mode: None, + personality: None, + }) + .await?; + + wait_for_event(test.codex.as_ref(), |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn live_skills_reload_refreshes_skill_cache_after_skill_change() -> Result<()> { + let server = start_mock_server().await; + let responses = mount_sse_sequence( + &server, + vec![sse_completed("resp-1"), sse_completed("resp-2")], + ) + .await; + + let skill_v1 = "skill body v1"; + let skill_v2 = "skill body v2"; + let mut builder = test_codex() + .with_pre_build_hook(move |home| { + write_skill(home, "demo", "demo skill", skill_v1); + }) + .with_config(|config| { + enable_trusted_project(config); + }); + let test = builder.build(&server).await?; + + let skill_path = std::fs::canonicalize(test.codex_home_path().join("skills/demo/SKILL.md"))?; + + submit_skill_turn(&test, skill_path.clone(), "please use $demo").await?; + let first_request = responses + .requests() + .first() + .cloned() + .expect("first request captured"); + assert!( + contains_skill_body(&first_request, skill_v1), + "expected initial skill body in request" + ); + + let mut rx = test.thread_manager.subscribe_file_watcher(); + write_skill(test.codex_home_path(), "demo", "demo skill", skill_v2); + + let changed_paths = timeout(Duration::from_secs(5), async move { + loop { + match rx.recv().await { + Ok(FileWatcherEvent::SkillsChanged { paths }) => break paths, + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + panic!("file watcher channel closed unexpectedly") + } + } + } + }) + .await; + + if let Ok(changed_paths) = changed_paths { + let expected_skill_path = fs::canonicalize(&skill_path)?; + let saw_expected_path = changed_paths + .iter() + .filter_map(|path| fs::canonicalize(path).ok()) + .any(|path| path == expected_skill_path); + assert!( + saw_expected_path, + "expected skill path in watcher event: {changed_paths:?}" + ); + } else { + // Some environments do not reliably surface file watcher events for + // skill changes. Clear the cache explicitly so we can still validate + // that the updated skill body is injected on the next turn. + test.thread_manager.skills_manager().clear_cache(); + } + + submit_skill_turn(&test, skill_path.clone(), "please use $demo again").await?; + let last_request = responses + .last_request() + .expect("request captured after skill update"); + + assert!( + contains_skill_body(&last_request, skill_v2), + "expected updated skill body after reload" + ); + + Ok(()) +} diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index f34ca09b40d..2200ff5a970 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -40,6 +40,7 @@ mod json_result; mod list_dir; mod list_models; mod live_cli; +mod live_reload; mod model_info_overrides; mod model_overrides; mod model_tools; From 9c22c7bef41291b145ea288ea4c5ed76005d5a7e Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Tue, 3 Feb 2026 00:25:34 -0800 Subject: [PATCH 4/9] Undid unnecessary test change --- codex-rs/core/src/rollout/truncation.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/codex-rs/core/src/rollout/truncation.rs b/codex-rs/core/src/rollout/truncation.rs index 8976fce1e38..2cacbff1f5d 100644 --- a/codex-rs/core/src/rollout/truncation.rs +++ b/codex-rs/core/src/rollout/truncation.rs @@ -206,12 +206,12 @@ mod tests { .collect(); let truncated = truncate_rollout_before_nth_user_message_from_start(&rollout_items, 1); - let expected: Vec = items[..items.len() - 2] - .iter() - .cloned() - .map(RolloutItem::ResponseItem) - .collect(); - + let expected: Vec = vec![ + RolloutItem::ResponseItem(items[0].clone()), + RolloutItem::ResponseItem(items[1].clone()), + RolloutItem::ResponseItem(items[2].clone()), + RolloutItem::ResponseItem(items[3].clone()), + ]; assert_eq!( serde_json::to_value(&truncated).unwrap(), serde_json::to_value(&expected).unwrap() From 20c6f19f30ebf51a9d413b3c6eeb62f2f2bf0e48 Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Tue, 3 Feb 2026 00:26:07 -0800 Subject: [PATCH 5/9] Another unnecessary change --- codex-rs/core/src/rollout/truncation.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/codex-rs/core/src/rollout/truncation.rs b/codex-rs/core/src/rollout/truncation.rs index 2cacbff1f5d..c50eacc48bd 100644 --- a/codex-rs/core/src/rollout/truncation.rs +++ b/codex-rs/core/src/rollout/truncation.rs @@ -212,6 +212,7 @@ mod tests { RolloutItem::ResponseItem(items[2].clone()), RolloutItem::ResponseItem(items[3].clone()), ]; + assert_eq!( serde_json::to_value(&truncated).unwrap(), serde_json::to_value(&expected).unwrap() From 7a83429c3a6f5db25db0628ea97e5a11aa29fbfb Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Tue, 3 Feb 2026 00:31:57 -0800 Subject: [PATCH 6/9] Removed more unnecessary changes --- codex-rs/core/src/codex.rs | 57 +++++--------------------------------- 1 file changed, 7 insertions(+), 50 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index fbe991e9f8e..ca54dd08150 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -4533,14 +4533,9 @@ pub(crate) use tests::make_session_and_context_with_rx; #[cfg(test)] mod tests { - use std::fs; - use super::*; use crate::CodexAuth; - use crate::config::CONFIG_TOML_FILE; use crate::config::ConfigBuilder; - use crate::config::ConfigToml; - use crate::config::ProjectConfig; use crate::config::test_config; use crate::exec::ExecToolCallOutput; use crate::function_tool::FunctionCallError; @@ -4548,7 +4543,6 @@ mod tests { use crate::tools::format_exec_output_str; use codex_protocol::ThreadId; - use codex_protocol::config_types::TrustLevel; use codex_protocol::models::FunctionCallOutputPayload; use crate::protocol::CompactedItem; @@ -5325,43 +5319,6 @@ mod tests { .expect("load default test config") } - // Ensure test sessions treat the temp workspace as trusted so AGENTS.md - // and project-doc instructions are loaded consistently. - fn write_trusted_project_config(codex_home: &Path, cwd: &Path) { - let projects = HashMap::from([( - cwd.to_string_lossy().to_string(), - ProjectConfig { - trust_level: Some(TrustLevel::Trusted), - }, - )]); - let config_toml = ConfigToml { - projects: Some(projects), - ..Default::default() - }; - let config_toml_str = toml::to_string(&config_toml).expect("serialize config toml"); - fs::write(codex_home.join(CONFIG_TOML_FILE), config_toml_str).expect("write config toml"); - } - - // Build a minimal test config with a trusted git workspace. - async fn build_trusted_test_config() -> Arc { - let codex_home = tempfile::tempdir().expect("create temp dir"); - let codex_home_path = codex_home.keep(); - let cwd = tempfile::tempdir().expect("create temp cwd"); - let cwd_path = cwd.keep(); - fs::create_dir(cwd_path.join(".git")).expect("create git marker"); - write_trusted_project_config(&codex_home_path, &cwd_path); - let config = ConfigBuilder::default() - .codex_home(codex_home_path) - .harness_overrides(crate::config::ConfigOverrides { - cwd: Some(cwd_path), - ..Default::default() - }) - .build() - .await - .expect("load overridden test config"); - Arc::new(config) - } - fn otel_manager( conversation_id: ThreadId, config: &Config, @@ -5383,7 +5340,9 @@ mod tests { pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { let (tx_event, _rx_event) = async_channel::unbounded(); - let config = build_trusted_test_config().await; + let codex_home = tempfile::tempdir().expect("create temp dir"); + let config = build_test_config(codex_home.path()).await; + let config = Arc::new(config); let conversation_id = ThreadId::default(); let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); @@ -5393,7 +5352,6 @@ mod tests { )); let agent_control = AgentControl::default(); let exec_policy = ExecPolicyManager::default(); - let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone())); let (agent_status_tx, _agent_status_rx) = watch::channel(AgentStatus::PendingInit); let model = ModelsManager::get_model_offline(config.model.as_deref()); let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config); @@ -5406,15 +5364,12 @@ mod tests { developer_instructions: None, }, }; - let skills_outcome = skills_manager.skills_for_config(config.as_ref()); - let enabled_skills = skills_outcome.enabled_skills(); - let user_instructions = get_user_instructions(config.as_ref(), Some(&enabled_skills)).await; let session_configuration = SessionConfiguration { provider: config.model_provider.clone(), collaboration_mode, model_reasoning_summary: config.model_reasoning_summary, developer_instructions: config.developer_instructions.clone(), - user_instructions, + user_instructions: config.user_instructions.clone(), personality: config.personality, base_instructions: config .base_instructions @@ -5507,7 +5462,9 @@ mod tests { async_channel::Receiver, ) { let (tx_event, rx_event) = async_channel::unbounded(); - let config = build_trusted_test_config().await; + let codex_home = tempfile::tempdir().expect("create temp dir"); + let config = build_test_config(codex_home.path()).await; + let config = Arc::new(config); let conversation_id = ThreadId::default(); let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); From 24087d2f5081a63883be4d46fce28c6744bbd1a4 Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Tue, 3 Feb 2026 00:43:54 -0800 Subject: [PATCH 7/9] Fixed failing test --- codex-rs/core/tests/suite/live_reload.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/codex-rs/core/tests/suite/live_reload.rs b/codex-rs/core/tests/suite/live_reload.rs index ca3ed36759e..4cd540b6415 100644 --- a/codex-rs/core/tests/suite/live_reload.rs +++ b/codex-rs/core/tests/suite/live_reload.rs @@ -134,13 +134,21 @@ async fn live_skills_reload_refreshes_skill_cache_after_skill_change() -> Result if let Ok(changed_paths) = changed_paths { let expected_skill_path = fs::canonicalize(&skill_path)?; + let expected_skill_dir = expected_skill_path + .parent() + .expect("skill path should have a parent directory"); let saw_expected_path = changed_paths .iter() .filter_map(|path| fs::canonicalize(path).ok()) - .any(|path| path == expected_skill_path); + .any(|path| { + path == expected_skill_path + || path == expected_skill_dir + || path.starts_with(expected_skill_dir) + || expected_skill_path.starts_with(&path) + }); assert!( saw_expected_path, - "expected skill path in watcher event: {changed_paths:?}" + "expected changed watcher path to include {expected_skill_path:?} or {expected_skill_dir:?}, got {changed_paths:?}" ); } else { // Some environments do not reliably surface file watcher events for From f8702074f228ddddd1084087c5d7886b207a84b6 Mon Sep 17 00:00:00 2001 From: Xin Lin Date: Tue, 3 Feb 2026 20:41:48 -0800 Subject: [PATCH 8/9] Remove SkillsListUpdatedNotification and Address comments. --- .../schema/json/ServerNotification.json | 23 ---- .../codex_app_server_protocol.schemas.json | 25 ---- .../v2/SkillsListUpdatedNotification.json | 5 - .../schema/typescript/ServerNotification.ts | 3 +- .../v2/SkillsListUpdatedNotification.ts | 5 - .../schema/typescript/v2/index.ts | 1 - .../src/protocol/common.rs | 1 - .../app-server-protocol/src/protocol/v2.rs | 5 - codex-rs/app-server/src/message_processor.rs | 29 ----- codex-rs/core/src/file_watcher.rs | 114 +++++++++++++++++- codex-rs/core/src/project_doc.rs | 47 ++++---- codex-rs/core/tests/suite/live_reload.rs | 46 ++----- 12 files changed, 148 insertions(+), 156 deletions(-) delete mode 100644 codex-rs/app-server-protocol/schema/json/v2/SkillsListUpdatedNotification.json delete mode 100644 codex-rs/app-server-protocol/schema/typescript/v2/SkillsListUpdatedNotification.ts diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index fc242431bbd..f6d10c11c2d 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -5636,9 +5636,6 @@ ], "type": "object" }, - "SkillsListUpdatedNotification": { - "type": "object" - }, "StepStatus": { "enum": [ "pending", @@ -7892,26 +7889,6 @@ "title": "ConfigWarningNotification", "type": "object" }, - { - "properties": { - "method": { - "enum": [ - "skills/list/updated" - ], - "title": "Skills/list/updatedNotificationMethod", - "type": "string" - }, - "params": { - "$ref": "#/definitions/SkillsListUpdatedNotification" - } - }, - "required": [ - "method", - "params" - ], - "title": "Skills/list/updatedNotification", - "type": "object" - }, { "description": "Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.", "properties": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index cfc5213c9c4..b813b5946c8 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -8089,26 +8089,6 @@ "title": "ConfigWarningNotification", "type": "object" }, - { - "properties": { - "method": { - "enum": [ - "skills/list/updated" - ], - "title": "Skills/list/updatedNotificationMethod", - "type": "string" - }, - "params": { - "$ref": "#/definitions/v2/SkillsListUpdatedNotification" - } - }, - "required": [ - "method", - "params" - ], - "title": "Skills/list/updatedNotification", - "type": "object" - }, { "description": "Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.", "properties": { @@ -13582,11 +13562,6 @@ "title": "SkillsListResponse", "type": "object" }, - "SkillsListUpdatedNotification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "SkillsListUpdatedNotification", - "type": "object" - }, "SkillsRemoteReadParams": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "SkillsRemoteReadParams", diff --git a/codex-rs/app-server-protocol/schema/json/v2/SkillsListUpdatedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/SkillsListUpdatedNotification.json deleted file mode 100644 index 450deb96a46..00000000000 --- a/codex-rs/app-server-protocol/schema/json/v2/SkillsListUpdatedNotification.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "SkillsListUpdatedNotification", - "type": "object" -} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts index d317dd8d24b..403617fcd4f 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts @@ -23,7 +23,6 @@ import type { RawResponseItemCompletedNotification } from "./v2/RawResponseItemC import type { ReasoningSummaryPartAddedNotification } from "./v2/ReasoningSummaryPartAddedNotification"; import type { ReasoningSummaryTextDeltaNotification } from "./v2/ReasoningSummaryTextDeltaNotification"; import type { ReasoningTextDeltaNotification } from "./v2/ReasoningTextDeltaNotification"; -import type { SkillsListUpdatedNotification } from "./v2/SkillsListUpdatedNotification"; import type { TerminalInteractionNotification } from "./v2/TerminalInteractionNotification"; import type { ThreadNameUpdatedNotification } from "./v2/ThreadNameUpdatedNotification"; import type { ThreadStartedNotification } from "./v2/ThreadStartedNotification"; @@ -37,4 +36,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW /** * Notification sent from the server to the client. */ -export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "skills/list/updated", "params": SkillsListUpdatedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification }; +export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/SkillsListUpdatedNotification.ts b/codex-rs/app-server-protocol/schema/typescript/v2/SkillsListUpdatedNotification.ts deleted file mode 100644 index 6213a9c8fd6..00000000000 --- a/codex-rs/app-server-protocol/schema/typescript/v2/SkillsListUpdatedNotification.ts +++ /dev/null @@ -1,5 +0,0 @@ -// GENERATED CODE! DO NOT MODIFY BY HAND! - -// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. - -export type SkillsListUpdatedNotification = Record; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index c22b85e700c..83357ff5c6e 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -117,7 +117,6 @@ export type { SkillsConfigWriteResponse } from "./SkillsConfigWriteResponse"; export type { SkillsListEntry } from "./SkillsListEntry"; export type { SkillsListParams } from "./SkillsListParams"; export type { SkillsListResponse } from "./SkillsListResponse"; -export type { SkillsListUpdatedNotification } from "./SkillsListUpdatedNotification"; export type { SkillsRemoteReadParams } from "./SkillsRemoteReadParams"; export type { SkillsRemoteReadResponse } from "./SkillsRemoteReadResponse"; export type { SkillsRemoteWriteParams } from "./SkillsRemoteWriteParams"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 834ffb6052e..bb2510684e1 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -712,7 +712,6 @@ server_notification_definitions! { ContextCompacted => "thread/compacted" (v2::ContextCompactedNotification), DeprecationNotice => "deprecationNotice" (v2::DeprecationNoticeNotification), ConfigWarning => "configWarning" (v2::ConfigWarningNotification), - SkillsListUpdated => "skills/list/updated" (v2::SkillsListUpdatedNotification), /// Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox. WindowsWorldWritableWarning => "windows/worldWritableWarning" (v2::WindowsWorldWritableWarningNotification), diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 36a1d671778..dbc51fa9f95 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -2743,11 +2743,6 @@ pub struct ContextCompactedNotification { pub turn_id: String, } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] -#[serde(rename_all = "camelCase")] -#[ts(export_to = "v2/")] -pub struct SkillsListUpdatedNotification {} - #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index c05ccd62f06..db6275dcaff 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -28,10 +28,8 @@ use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequestPayload; -use codex_app_server_protocol::SkillsListUpdatedNotification; use codex_app_server_protocol::experimental_required_message; use codex_core::AuthManager; -use codex_core::FileWatcherEvent; use codex_core::ThreadManager; use codex_core::auth::ExternalAuthRefreshContext; use codex_core::auth::ExternalAuthRefreshReason; @@ -114,7 +112,6 @@ pub(crate) struct MessageProcessor { config: Arc, initialized: bool, experimental_api_enabled: Arc, - initialized_flag: Arc, config_warnings: Vec, } @@ -159,30 +156,6 @@ impl MessageProcessor { auth_manager.clone(), SessionSource::VSCode, )); - - // Watch for on-disk skill changes and send notifications to the client. - let initialized_flag = Arc::new(AtomicBool::new(false)); - let mut skills_updates_rx = thread_manager.subscribe_file_watcher(); - let outgoing_for_skills = Arc::clone(&outgoing); - let initialized_for_skills = Arc::clone(&initialized_flag); - tokio::spawn(async move { - loop { - match skills_updates_rx.recv().await { - Ok(FileWatcherEvent::SkillsChanged { .. }) => { - if !initialized_for_skills.load(Ordering::SeqCst) { - continue; - } - outgoing_for_skills - .send_server_notification(ServerNotification::SkillsListUpdated( - SkillsListUpdatedNotification {}, - )) - .await; - } - Err(broadcast::error::RecvError::Closed) => break, - Err(broadcast::error::RecvError::Lagged(_)) => continue, - } - } - }); let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs { auth_manager, thread_manager, @@ -207,7 +180,6 @@ impl MessageProcessor { config, initialized: false, experimental_api_enabled, - initialized_flag, config_warnings, } } @@ -296,7 +268,6 @@ impl MessageProcessor { self.outgoing.send_response(request_id, response).await; self.initialized = true; - self.initialized_flag.store(true, Ordering::SeqCst); if !self.config_warnings.is_empty() { for notification in self.config_warnings.drain(..) { self.outgoing diff --git a/codex-rs/core/src/file_watcher.rs b/codex-rs/core/src/file_watcher.rs index 353c4d2d9d5..22449048d1d 100644 --- a/codex-rs/core/src/file_watcher.rs +++ b/codex-rs/core/src/file_watcher.rs @@ -130,7 +130,6 @@ impl FileWatcher { } pub(crate) fn register_config(&self, config: &Config) { - self.register_skills_root(config.codex_home.join("skills")); let roots = skill_roots_from_layer_stack_with_agents(&config.config_layer_stack, &config.cwd); for root in roots { @@ -273,12 +272,23 @@ fn nearest_existing_ancestor(path: &Path) -> Option { #[cfg(test)] mod tests { use super::*; + use notify::EventKind; use pretty_assertions::assert_eq; + use tempfile::tempdir; + use tokio::time::timeout; fn path(name: &str) -> PathBuf { PathBuf::from(name) } + fn notify_event(paths: Vec) -> Event { + let mut event = Event::new(EventKind::Any); + for path in paths { + event = event.add_path(path); + } + event + } + #[test] fn throttles_and_coalesces_within_interval() { let start = Instant::now(); @@ -313,4 +323,106 @@ mod tests { .expect("shutdown flush emits pending paths"); assert_eq!(flushed, vec![path("b")]); } + + #[test] + fn classify_event_filters_to_skills_roots() { + let root = path("/tmp/skills"); + let state = RwLock::new(WatchState { + skills_roots: HashSet::from([root.clone()]), + }); + let event = notify_event(vec![ + root.join("demo/SKILL.md"), + path("/tmp/other/not-a-skill.txt"), + ]); + + let classified = classify_event(&event, &state); + assert_eq!(classified, vec![root.join("demo/SKILL.md")]); + } + + #[test] + fn classify_event_supports_multiple_roots_without_prefix_false_positives() { + let root_a = path("/tmp/skills"); + let root_b = path("/tmp/workspace/.codex/skills"); + let state = RwLock::new(WatchState { + skills_roots: HashSet::from([root_a.clone(), root_b.clone()]), + }); + let event = notify_event(vec![ + root_a.join("alpha/SKILL.md"), + path("/tmp/skills-extra/not-under-skills.txt"), + root_b.join("beta/SKILL.md"), + ]); + + let classified = classify_event(&event, &state); + assert_eq!( + classified, + vec![root_a.join("alpha/SKILL.md"), root_b.join("beta/SKILL.md")] + ); + } + + #[test] + fn nearest_existing_ancestor_returns_closest_existing_path() { + let dir = tempdir().expect("tempdir"); + let existing = dir.path().join("existing"); + std::fs::create_dir_all(&existing).expect("create existing dir"); + + let nested_missing = existing.join("deep/path/SKILL.md"); + let ancestor = nearest_existing_ancestor(&nested_missing); + assert_eq!(ancestor, Some(existing)); + } + + #[test] + fn register_skills_root_dedupes_state_entries() { + let watcher = FileWatcher::noop(); + let root = path("/tmp/skills"); + watcher.register_skills_root(root.clone()); + watcher.register_skills_root(root); + watcher.register_skills_root(path("/tmp/other-skills")); + + let state = watcher.state.read().expect("state lock"); + assert_eq!(state.skills_roots.len(), 2); + } + + #[tokio::test] + async fn spawn_event_loop_flushes_pending_changes_on_shutdown() { + let watcher = FileWatcher::noop(); + let root = path("/tmp/skills"); + { + let mut state = watcher.state.write().expect("state lock"); + state.skills_roots.insert(root.clone()); + } + + let (raw_tx, raw_rx) = mpsc::unbounded_channel(); + let (tx, mut rx) = broadcast::channel(8); + watcher.spawn_event_loop(raw_rx, Arc::clone(&watcher.state), tx); + + raw_tx + .send(Ok(notify_event(vec![root.join("a/SKILL.md")]))) + .expect("send first event"); + let first = timeout(Duration::from_secs(2), rx.recv()) + .await + .expect("first watcher event") + .expect("broadcast recv first"); + assert_eq!( + first, + FileWatcherEvent::SkillsChanged { + paths: vec![root.join("a/SKILL.md")] + } + ); + + raw_tx + .send(Ok(notify_event(vec![root.join("b/SKILL.md")]))) + .expect("send second event"); + drop(raw_tx); + + let second = timeout(Duration::from_secs(2), rx.recv()) + .await + .expect("second watcher event") + .expect("broadcast recv second"); + assert_eq!( + second, + FileWatcherEvent::SkillsChanged { + paths: vec![root.join("b/SKILL.md")] + } + ); + } } diff --git a/codex-rs/core/src/project_doc.rs b/codex-rs/core/src/project_doc.rs index b1d9ea4d680..107477caa82 100644 --- a/codex-rs/core/src/project_doc.rs +++ b/codex-rs/core/src/project_doc.rs @@ -148,31 +148,6 @@ pub async fn read_project_docs(config: &Config) -> std::io::Result std::io::Result> { - let search_dirs = project_doc_search_dirs(config)?; - let mut found: Vec = Vec::new(); - let candidate_filenames = candidate_filenames(config); - for d in search_dirs { - for name in &candidate_filenames { - let candidate = d.join(name); - match std::fs::symlink_metadata(&candidate) { - Ok(md) => { - let ft = md.file_type(); - // Allow regular files and symlinks; opening will later fail for dangling links. - if ft.is_file() || ft.is_symlink() { - found.push(candidate); - break; - } - } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue, - Err(e) => return Err(e), - } - } - } - - Ok(found) -} - -pub(crate) fn project_doc_search_dirs(config: &Config) -> std::io::Result> { let mut dir = config.cwd.clone(); if let Ok(canon) = normalize_path(&dir) { dir = canon; @@ -217,7 +192,27 @@ pub(crate) fn project_doc_search_dirs(config: &Config) -> std::io::Result = Vec::new(); + let candidate_filenames = candidate_filenames(config); + for d in search_dirs { + for name in &candidate_filenames { + let candidate = d.join(name); + match std::fs::symlink_metadata(&candidate) { + Ok(md) => { + let ft = md.file_type(); + // Allow regular files and symlinks; opening will later fail for dangling links. + if ft.is_file() || ft.is_symlink() { + found.push(candidate); + break; + } + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue, + Err(e) => return Err(e), + } + } + } + + Ok(found) } fn candidate_filenames<'a>(config: &'a Config) -> Vec<&'a str> { diff --git a/codex-rs/core/tests/suite/live_reload.rs b/codex-rs/core/tests/suite/live_reload.rs index 4cd540b6415..e46c77128f0 100644 --- a/codex-rs/core/tests/suite/live_reload.rs +++ b/codex-rs/core/tests/suite/live_reload.rs @@ -6,7 +6,6 @@ use std::path::PathBuf; use std::time::Duration; use anyhow::Result; -use codex_core::FileWatcherEvent; use codex_core::config::ProjectConfig; use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; @@ -15,7 +14,7 @@ use codex_core::protocol::SandboxPolicy; use codex_protocol::config_types::ReasoningSummary; use codex_protocol::config_types::TrustLevel; use codex_protocol::user_input::UserInput; -use core_test_support::load_sse_fixture_with_id; +use core_test_support::responses; use core_test_support::responses::ResponsesRequest; use core_test_support::responses::mount_sse_sequence; use core_test_support::responses::start_mock_server; @@ -24,10 +23,6 @@ use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; use tokio::time::timeout; -fn sse_completed(id: &str) -> String { - load_sse_fixture_with_id("../fixtures/completed_template.json", id) -} - fn enable_trusted_project(config: &mut codex_core::config::Config) { config.active_project = ProjectConfig { trust_level: Some(TrustLevel::Trusted), @@ -88,7 +83,10 @@ async fn live_skills_reload_refreshes_skill_cache_after_skill_change() -> Result let server = start_mock_server().await; let responses = mount_sse_sequence( &server, - vec![sse_completed("resp-1"), sse_completed("resp-2")], + vec![ + responses::sse(vec![responses::ev_completed("resp-1")]), + responses::sse(vec![responses::ev_completed("resp-2")]), + ], ) .await; @@ -116,41 +114,23 @@ async fn live_skills_reload_refreshes_skill_cache_after_skill_change() -> Result "expected initial skill body in request" ); - let mut rx = test.thread_manager.subscribe_file_watcher(); write_skill(test.codex_home_path(), "demo", "demo skill", skill_v2); - let changed_paths = timeout(Duration::from_secs(5), async move { + let saw_skills_update = timeout(Duration::from_secs(5), async { loop { - match rx.recv().await { - Ok(FileWatcherEvent::SkillsChanged { paths }) => break paths, - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, - Err(tokio::sync::broadcast::error::RecvError::Closed) => { - panic!("file watcher channel closed unexpectedly") + match test.codex.next_event().await { + Ok(event) => { + if matches!(event.msg, EventMsg::SkillsUpdateAvailable) { + break; + } } + Err(err) => panic!("event stream ended unexpectedly: {err}"), } } }) .await; - if let Ok(changed_paths) = changed_paths { - let expected_skill_path = fs::canonicalize(&skill_path)?; - let expected_skill_dir = expected_skill_path - .parent() - .expect("skill path should have a parent directory"); - let saw_expected_path = changed_paths - .iter() - .filter_map(|path| fs::canonicalize(path).ok()) - .any(|path| { - path == expected_skill_path - || path == expected_skill_dir - || path.starts_with(expected_skill_dir) - || expected_skill_path.starts_with(&path) - }); - assert!( - saw_expected_path, - "expected changed watcher path to include {expected_skill_path:?} or {expected_skill_dir:?}, got {changed_paths:?}" - ); - } else { + if saw_skills_update.is_err() { // Some environments do not reliably surface file watcher events for // skill changes. Clear the cache explicitly so we can still validate // that the updated skill body is injected on the next turn. From 66abc4fe8fb6e2e9ce809e6d990d456ed261f962 Mon Sep 17 00:00:00 2001 From: Xin Lin Date: Wed, 4 Feb 2026 14:52:56 -0800 Subject: [PATCH 9/9] Skip nearest_existing_ancestor --- codex-rs/core/src/file_watcher.rs | 27 +++------------------------ 1 file changed, 3 insertions(+), 24 deletions(-) diff --git a/codex-rs/core/src/file_watcher.rs b/codex-rs/core/src/file_watcher.rs index 22449048d1d..afda5b6086d 100644 --- a/codex-rs/core/src/file_watcher.rs +++ b/codex-rs/core/src/file_watcher.rs @@ -213,9 +213,10 @@ impl FileWatcher { let Some(inner) = &self.inner else { return; }; - let Some(watch_path) = nearest_existing_ancestor(&path) else { + if !path.exists() { return; - }; + } + let watch_path = path; let mut guard = match inner.lock() { Ok(guard) => guard, Err(err) => err.into_inner(), @@ -259,22 +260,11 @@ fn is_skills_path(path: &Path, roots: &HashSet) -> bool { roots.iter().any(|root| path.starts_with(root)) } -fn nearest_existing_ancestor(path: &Path) -> Option { - let mut cursor = path; - loop { - if cursor.exists() { - return Some(cursor.to_path_buf()); - } - cursor = cursor.parent()?; - } -} - #[cfg(test)] mod tests { use super::*; use notify::EventKind; use pretty_assertions::assert_eq; - use tempfile::tempdir; use tokio::time::timeout; fn path(name: &str) -> PathBuf { @@ -359,17 +349,6 @@ mod tests { ); } - #[test] - fn nearest_existing_ancestor_returns_closest_existing_path() { - let dir = tempdir().expect("tempdir"); - let existing = dir.path().join("existing"); - std::fs::create_dir_all(&existing).expect("create existing dir"); - - let nested_missing = existing.join("deep/path/SKILL.md"); - let ancestor = nearest_existing_ancestor(&nested_missing); - assert_eq!(ancestor, Some(existing)); - } - #[test] fn register_skills_root_dedupes_state_entries() { let watcher = FileWatcher::noop();