diff --git a/codex-rs/app-server-test-client/src/main.rs b/codex-rs/app-server-test-client/src/main.rs index d949c3a7346..6d3fc06d81c 100644 --- a/codex-rs/app-server-test-client/src/main.rs +++ b/codex-rs/app-server-test-client/src/main.rs @@ -1,7 +1,9 @@ use std::collections::VecDeque; +use std::fs; use std::io::BufRead; use std::io::BufReader; use std::io::Write; +use std::path::Path; use std::process::Child; use std::process::ChildStdin; use std::process::ChildStdout; @@ -24,6 +26,7 @@ use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::CommandExecutionApprovalDecision; use codex_app_server_protocol::CommandExecutionRequestApprovalParams; use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; +use codex_app_server_protocol::DynamicToolSpec; use codex_app_server_protocol::FileChangeApprovalDecision; use codex_app_server_protocol::FileChangeRequestApprovalParams; use codex_app_server_protocol::FileChangeRequestApprovalResponse; @@ -83,6 +86,15 @@ struct Cli { )] config_overrides: Vec, + /// JSON array of dynamic tool specs or a single tool object. + /// Prefix a filename with '@' to read from a file. + /// + /// Example: + /// --dynamic-tools '[{"name":"demo","description":"Demo","inputSchema":{"type":"object"}}]' + /// --dynamic-tools @/path/to/tools.json + #[arg(long, value_name = "json-or-@file", global = true)] + dynamic_tools: Option, + #[command(subcommand)] command: CliCommand, } @@ -140,23 +152,29 @@ fn main() -> Result<()> { let Cli { codex_bin, config_overrides, + dynamic_tools, command, } = Cli::parse(); + let dynamic_tools = parse_dynamic_tools_arg(&dynamic_tools)?; + match command { CliCommand::SendMessage { user_message } => { + ensure_dynamic_tools_unused(&dynamic_tools, "send-message")?; send_message(&codex_bin, &config_overrides, user_message) } CliCommand::SendMessageV2 { user_message } => { - send_message_v2(&codex_bin, &config_overrides, user_message) + send_message_v2(&codex_bin, &config_overrides, user_message, &dynamic_tools) } CliCommand::TriggerCmdApproval { user_message } => { - trigger_cmd_approval(&codex_bin, &config_overrides, user_message) + trigger_cmd_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools) } CliCommand::TriggerPatchApproval { user_message } => { - trigger_patch_approval(&codex_bin, &config_overrides, user_message) + trigger_patch_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools) + } + CliCommand::NoTriggerCmdApproval => { + no_trigger_cmd_approval(&codex_bin, &config_overrides, &dynamic_tools) } - CliCommand::NoTriggerCmdApproval => no_trigger_cmd_approval(&codex_bin, &config_overrides), CliCommand::SendFollowUpV2 { first_message, follow_up_message, @@ -165,10 +183,20 @@ fn main() -> Result<()> { &config_overrides, first_message, follow_up_message, + &dynamic_tools, ), - CliCommand::TestLogin => test_login(&codex_bin, &config_overrides), - CliCommand::GetAccountRateLimits => get_account_rate_limits(&codex_bin, &config_overrides), - CliCommand::ModelList => model_list(&codex_bin, &config_overrides), + CliCommand::TestLogin => { + ensure_dynamic_tools_unused(&dynamic_tools, "test-login")?; + test_login(&codex_bin, &config_overrides) + } + CliCommand::GetAccountRateLimits => { + ensure_dynamic_tools_unused(&dynamic_tools, "get-account-rate-limits")?; + get_account_rate_limits(&codex_bin, &config_overrides) + } + CliCommand::ModelList => { + ensure_dynamic_tools_unused(&dynamic_tools, "model-list")?; + model_list(&codex_bin, &config_overrides) + } } } @@ -198,14 +226,23 @@ fn send_message_v2( codex_bin: &str, config_overrides: &[String], user_message: String, + dynamic_tools: &Option>, ) -> Result<()> { - send_message_v2_with_policies(codex_bin, config_overrides, user_message, None, None) + send_message_v2_with_policies( + codex_bin, + config_overrides, + user_message, + None, + None, + dynamic_tools, + ) } fn trigger_cmd_approval( codex_bin: &str, config_overrides: &[String], user_message: Option, + dynamic_tools: &Option>, ) -> Result<()> { let default_prompt = "Run `touch /tmp/should-trigger-approval` so I can confirm the file exists."; @@ -216,6 +253,7 @@ fn trigger_cmd_approval( message, Some(AskForApproval::OnRequest), Some(SandboxPolicy::ReadOnly), + dynamic_tools, ) } @@ -223,6 +261,7 @@ fn trigger_patch_approval( codex_bin: &str, config_overrides: &[String], user_message: Option, + dynamic_tools: &Option>, ) -> Result<()> { let default_prompt = "Create a file named APPROVAL_DEMO.txt containing a short hello message using apply_patch."; @@ -233,12 +272,24 @@ fn trigger_patch_approval( message, Some(AskForApproval::OnRequest), Some(SandboxPolicy::ReadOnly), + dynamic_tools, ) } -fn no_trigger_cmd_approval(codex_bin: &str, config_overrides: &[String]) -> Result<()> { +fn no_trigger_cmd_approval( + codex_bin: &str, + config_overrides: &[String], + dynamic_tools: &Option>, +) -> Result<()> { let prompt = "Run `touch should_not_trigger_approval.txt`"; - send_message_v2_with_policies(codex_bin, config_overrides, prompt.to_string(), None, None) + send_message_v2_with_policies( + codex_bin, + config_overrides, + prompt.to_string(), + None, + None, + dynamic_tools, + ) } fn send_message_v2_with_policies( @@ -247,13 +298,17 @@ fn send_message_v2_with_policies( user_message: String, approval_policy: Option, sandbox_policy: Option, + dynamic_tools: &Option>, ) -> Result<()> { let mut client = CodexClient::spawn(codex_bin, config_overrides)?; let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); - let thread_response = client.thread_start(ThreadStartParams::default())?; + let thread_response = client.thread_start(ThreadStartParams { + dynamic_tools: dynamic_tools.clone(), + ..Default::default() + })?; println!("< thread/start response: {thread_response:?}"); let mut turn_params = TurnStartParams { thread_id: thread_response.thread.id.clone(), @@ -280,13 +335,17 @@ fn send_follow_up_v2( config_overrides: &[String], first_message: String, follow_up_message: String, + dynamic_tools: &Option>, ) -> Result<()> { let mut client = CodexClient::spawn(codex_bin, config_overrides)?; let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); - let thread_response = client.thread_start(ThreadStartParams::default())?; + let thread_response = client.thread_start(ThreadStartParams { + dynamic_tools: dynamic_tools.clone(), + ..Default::default() + })?; println!("< thread/start response: {thread_response:?}"); let first_turn_params = TurnStartParams { @@ -372,6 +431,40 @@ fn model_list(codex_bin: &str, config_overrides: &[String]) -> Result<()> { Ok(()) } +fn ensure_dynamic_tools_unused( + dynamic_tools: &Option>, + command: &str, +) -> Result<()> { + if dynamic_tools.is_some() { + bail!( + "dynamic tools are only supported for v2 thread/start; remove --dynamic-tools for {command} or use send-message-v2" + ); + } + Ok(()) +} + +fn parse_dynamic_tools_arg(dynamic_tools: &Option) -> Result>> { + let Some(raw_arg) = dynamic_tools.as_deref() else { + return Ok(None); + }; + + let raw_json = if let Some(path) = raw_arg.strip_prefix('@') { + fs::read_to_string(Path::new(path)) + .with_context(|| format!("read dynamic tools file {path}"))? + } else { + raw_arg.to_string() + }; + + let value: Value = serde_json::from_str(&raw_json).context("parse dynamic tools JSON")?; + let tools = match value { + Value::Array(_) => serde_json::from_value(value).context("decode dynamic tools array")?, + Value::Object(_) => vec![serde_json::from_value(value).context("decode dynamic tool")?], + _ => bail!("dynamic tools JSON must be an object or array"), + }; + + Ok(Some(tools)) +} + struct CodexClient { child: Child, stdin: Option, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index ba5ee02d692..b4e3859500e 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -333,9 +333,36 @@ impl Codex { .clone() .or_else(|| conversation_history.get_base_instructions().map(|s| s.text)) .unwrap_or_else(|| model_info.get_model_instructions(config.personality)); - // Respect explicit thread-start tools; fall back to persisted tools when resuming a thread. + + // Respect thread-start tools. When missing (resumed/forked threads), read from the db + // first, then fall back to rollout-file tools. + let persisted_tools = if dynamic_tools.is_empty() + && config.features.enabled(Feature::Sqlite) + { + let thread_id = match &conversation_history { + InitialHistory::Resumed(resumed) => Some(resumed.conversation_id), + InitialHistory::Forked(_) => conversation_history.forked_from_id(), + InitialHistory::New => None, + }; + match thread_id { + Some(thread_id) => { + let state_db_ctx = state_db::open_if_present( + config.codex_home.as_path(), + config.model_provider_id.as_str(), + ) + .await; + state_db::get_dynamic_tools(state_db_ctx.as_deref(), thread_id, "codex_spawn") + .await + } + None => None, + } + } else { + None + }; let dynamic_tools = if dynamic_tools.is_empty() { - conversation_history.get_dynamic_tools().unwrap_or_default() + persisted_tools + .or_else(|| conversation_history.get_dynamic_tools()) + .unwrap_or_default() } else { dynamic_tools }; diff --git a/codex-rs/core/src/rollout/metadata.rs b/codex-rs/core/src/rollout/metadata.rs index 2d59d71af37..42e52f78d63 100644 --- a/codex-rs/core/src/rollout/metadata.rs +++ b/codex-rs/core/src/rollout/metadata.rs @@ -187,6 +187,29 @@ pub(crate) async fn backfill_sessions( warn!("failed to upsert rollout {}: {err}", path.display()); } else { stats.upserted = stats.upserted.saturating_add(1); + if let Ok(meta_line) = rollout::list::read_session_meta_line(&path).await { + if let Err(err) = runtime + .persist_dynamic_tools( + meta_line.meta.id, + meta_line.meta.dynamic_tools.as_deref(), + ) + .await + { + if let Some(otel) = otel { + otel.counter( + DB_ERROR_METRIC, + 1, + &[("stage", "backfill_dynamic_tools")], + ); + } + warn!("failed to backfill dynamic tools {}: {err}", path.display()); + } + } else { + warn!( + "failed to read session meta for dynamic tools {}", + path.display() + ); + } } } Err(err) => { diff --git a/codex-rs/core/src/state_db.rs b/codex-rs/core/src/state_db.rs index a52f46593a4..ff95ed946f9 100644 --- a/codex-rs/core/src/state_db.rs +++ b/codex-rs/core/src/state_db.rs @@ -9,6 +9,7 @@ use chrono::Timelike; use chrono::Utc; use codex_otel::OtelManager; use codex_protocol::ThreadId; +use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use codex_state::DB_METRIC_COMPARE_ERROR; @@ -196,6 +197,37 @@ pub async fn find_rollout_path_by_id( }) } +/// Get dynamic tools for a thread id using SQLite. +pub async fn get_dynamic_tools( + context: Option<&codex_state::StateRuntime>, + thread_id: ThreadId, + stage: &str, +) -> Option> { + let ctx = context?; + match ctx.get_dynamic_tools(thread_id).await { + Ok(tools) => tools, + Err(err) => { + warn!("state db get_dynamic_tools failed during {stage}: {err}"); + None + } + } +} + +/// Persist dynamic tools for a thread id using SQLite, if none exist yet. +pub async fn persist_dynamic_tools( + context: Option<&codex_state::StateRuntime>, + thread_id: ThreadId, + tools: Option<&[DynamicToolSpec]>, + stage: &str, +) { + let Some(ctx) = context else { + return; + }; + if let Err(err) = ctx.persist_dynamic_tools(thread_id, tools).await { + warn!("state db persist_dynamic_tools failed during {stage}: {err}"); + } +} + /// Reconcile rollout items into SQLite, falling back to scanning the rollout file. pub async fn reconcile_rollout( context: Option<&codex_state::StateRuntime>, @@ -235,6 +267,21 @@ pub async fn reconcile_rollout( "state db reconcile_rollout upsert failed {}: {err}", rollout_path.display() ); + return; + } + if let Ok(meta_line) = crate::rollout::list::read_session_meta_line(rollout_path).await { + persist_dynamic_tools( + Some(ctx), + meta_line.meta.id, + meta_line.meta.dynamic_tools.as_deref(), + "reconcile_rollout", + ) + .await; + } else { + warn!( + "state db reconcile_rollout missing session meta {}", + rollout_path.display() + ); } } diff --git a/codex-rs/core/tests/suite/sqlite_state.rs b/codex-rs/core/tests/suite/sqlite_state.rs index 2582f90cbec..218da34825c 100644 --- a/codex-rs/core/tests/suite/sqlite_state.rs +++ b/codex-rs/core/tests/suite/sqlite_state.rs @@ -1,6 +1,7 @@ use anyhow::Result; use codex_core::features::Feature; use codex_protocol::ThreadId; +use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; @@ -74,6 +75,28 @@ async fn backfill_scans_existing_rollouts() -> Result<()> { let rollout_rel_path = format!("sessions/2026/01/27/rollout-2026-01-27T12-00-00-{uuid}.jsonl"); let rollout_rel_path_for_hook = rollout_rel_path.clone(); + let dynamic_tools = vec![ + DynamicToolSpec { + name: "geo_lookup".to_string(), + description: "lookup a city".to_string(), + input_schema: json!({ + "type": "object", + "required": ["city"], + "properties": { "city": { "type": "string" } } + }), + }, + DynamicToolSpec { + name: "weather_lookup".to_string(), + description: "lookup weather".to_string(), + input_schema: json!({ + "type": "object", + "required": ["zip"], + "properties": { "zip": { "type": "string" } } + }), + }, + ]; + let dynamic_tools_for_hook = dynamic_tools.clone(); + let mut builder = test_codex() .with_pre_build_hook(move |codex_home| { let rollout_path = codex_home.join(&rollout_rel_path_for_hook); @@ -81,7 +104,6 @@ async fn backfill_scans_existing_rollouts() -> Result<()> { .parent() .expect("rollout path should have parent"); fs::create_dir_all(parent).expect("should create rollout directory"); - let session_meta_line = SessionMetaLine { meta: SessionMeta { id: thread_id, @@ -93,7 +115,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> { source: SessionSource::default(), model_provider: None, base_instructions: None, - dynamic_tools: None, + dynamic_tools: Some(dynamic_tools_for_hook), }, git: None, }; @@ -155,6 +177,17 @@ async fn backfill_scans_existing_rollouts() -> Result<()> { assert_eq!(metadata.model_provider, default_provider); assert!(metadata.has_user_event); + let mut stored_tools = None; + for _ in 0..40 { + stored_tools = db.get_dynamic_tools(thread_id).await?; + if stored_tools.is_some() { + break; + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + let stored_tools = stored_tools.expect("dynamic tools should be stored"); + assert_eq!(stored_tools, dynamic_tools); + Ok(()) } diff --git a/codex-rs/state/migrations/0004_thread_dynamic_tools.sql b/codex-rs/state/migrations/0004_thread_dynamic_tools.sql new file mode 100644 index 00000000000..0f40b5f8005 --- /dev/null +++ b/codex-rs/state/migrations/0004_thread_dynamic_tools.sql @@ -0,0 +1,11 @@ +CREATE TABLE thread_dynamic_tools ( + thread_id TEXT NOT NULL, + position INTEGER NOT NULL, + name TEXT NOT NULL, + description TEXT NOT NULL, + input_schema TEXT NOT NULL, + PRIMARY KEY(thread_id, position), + FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE +); + +CREATE INDEX idx_thread_dynamic_tools_thread ON thread_dynamic_tools(thread_id); diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 6f2e1289677..3b37b6d4246 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -16,8 +16,10 @@ use chrono::DateTime; use chrono::Utc; use codex_otel::OtelManager; use codex_protocol::ThreadId; +use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::protocol::RolloutItem; use log::LevelFilter; +use serde_json::Value; use sqlx::ConnectOptions; use sqlx::QueryBuilder; use sqlx::Row; @@ -117,6 +119,38 @@ WHERE id = ? .transpose() } + /// Get dynamic tools for a thread, if present. + pub async fn get_dynamic_tools( + &self, + thread_id: ThreadId, + ) -> anyhow::Result>> { + let rows = sqlx::query( + r#" +SELECT name, description, input_schema +FROM thread_dynamic_tools +WHERE thread_id = ? +ORDER BY position ASC + "#, + ) + .bind(thread_id.to_string()) + .fetch_all(self.pool.as_ref()) + .await?; + if rows.is_empty() { + return Ok(None); + } + let mut tools = Vec::with_capacity(rows.len()); + for row in rows { + let input_schema: String = row.try_get("input_schema")?; + let input_schema = serde_json::from_str::(input_schema.as_str())?; + tools.push(DynamicToolSpec { + name: row.try_get("name")?, + description: row.try_get("description")?, + input_schema, + }); + } + Ok(Some(tools)) + } + /// Find a rollout path by thread id using the underlying database. pub async fn find_rollout_path_by_id( &self, @@ -369,6 +403,58 @@ ON CONFLICT(id) DO UPDATE SET Ok(()) } + /// Persist dynamic tools for a thread if none have been stored yet. + /// + /// Dynamic tools are defined at thread start and should not change afterward. + /// This only writes the first time we see tools for a given thread. + pub async fn persist_dynamic_tools( + &self, + thread_id: ThreadId, + tools: Option<&[DynamicToolSpec]>, + ) -> anyhow::Result<()> { + let Some(tools) = tools else { + return Ok(()); + }; + if tools.is_empty() { + return Ok(()); + } + let mut tx = self.pool.begin().await?; + let thread_id = thread_id.to_string(); + let existing: Option = + sqlx::query_scalar("SELECT 1 FROM thread_dynamic_tools WHERE thread_id = ? LIMIT 1") + .bind(thread_id.as_str()) + .fetch_optional(&mut *tx) + .await?; + if existing.is_some() { + tx.commit().await?; + return Ok(()); + } + for (idx, tool) in tools.iter().enumerate() { + let position = i64::try_from(idx).unwrap_or(i64::MAX); + let input_schema = serde_json::to_string(&tool.input_schema)?; + sqlx::query( + r#" +INSERT INTO thread_dynamic_tools ( + thread_id, + position, + name, + description, + input_schema +) VALUES (?, ?, ?, ?, ?) + "#, + ) + .bind(thread_id.as_str()) + .bind(position) + .bind(tool.name.as_str()) + .bind(tool.description.as_str()) + .bind(input_schema) + .execute(&mut *tx) + .await?; + } + tx.commit().await?; + Ok(()) + } + /// Apply rollout items incrementally using the underlying database. pub async fn apply_rollout_items( &self, @@ -390,12 +476,25 @@ ON CONFLICT(id) DO UPDATE SET if let Some(updated_at) = file_modified_time_utc(builder.rollout_path.as_path()).await { metadata.updated_at = updated_at; } + // Keep the thread upsert before dynamic tools to satisfy the foreign key constraint: + // thread_dynamic_tools.thread_id -> threads.id. if let Err(err) = self.upsert_thread(&metadata).await { if let Some(otel) = otel { otel.counter(DB_ERROR_METRIC, 1, &[("stage", "apply_rollout_items")]); } return Err(err); } + let dynamic_tools = extract_dynamic_tools(items); + if let Some(dynamic_tools) = dynamic_tools + && let Err(err) = self + .persist_dynamic_tools(builder.id, dynamic_tools.as_deref()) + .await + { + if let Some(otel) = otel { + otel.counter(DB_ERROR_METRIC, 1, &[("stage", "persist_dynamic_tools")]); + } + return Err(err); + } Ok(()) } @@ -507,6 +606,16 @@ fn push_like_filters<'a>( builder.push(")"); } +fn extract_dynamic_tools(items: &[RolloutItem]) -> Option>> { + items.iter().find_map(|item| match item { + RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.dynamic_tools.clone()), + RolloutItem::ResponseItem(_) + | RolloutItem::Compacted(_) + | RolloutItem::TurnContext(_) + | RolloutItem::EventMsg(_) => None, + }) +} + async fn open_sqlite(path: &Path) -> anyhow::Result { let options = SqliteConnectOptions::new() .filename(path)