Skip to content
66 changes: 64 additions & 2 deletions codex-rs/core/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::RwLock;

use crate::api_bridge::CoreAuthProvider;
use crate::api_bridge::auth_provider_from_auth;
use crate::api_bridge::map_api_error;
use crate::auth::UnauthorizedRecovery;
use crate::turn_metadata::build_turn_metadata_header;
use codex_api::AggregateStreamExt;
use codex_api::ChatClient as ApiChatClient;
use codex_api::CompactClient as ApiCompactClient;
Expand Down Expand Up @@ -72,6 +75,13 @@ use crate::transport_manager::TransportManager;

pub const WEB_SEARCH_ELIGIBLE_HEADER: &str = "x-oai-web-search-eligible";
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";

#[derive(Debug, Default)]
struct TurnMetadataCache {
cwd: Option<PathBuf>,
header: Option<HeaderValue>,
}

#[derive(Debug)]
struct ModelClientState {
Expand All @@ -85,6 +95,7 @@ struct ModelClientState {
summary: ReasoningSummaryConfig,
session_source: SessionSource,
transport_manager: TransportManager,
turn_metadata_cache: Arc<RwLock<TurnMetadataCache>>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -136,11 +147,13 @@ impl ModelClient {
summary,
session_source,
transport_manager,
turn_metadata_cache: Arc::new(RwLock::new(TurnMetadataCache::default())),
}),
}
}

pub fn new_session(&self) -> ModelClientSession {
pub fn new_session(&self, turn_metadata_cwd: Option<PathBuf>) -> ModelClientSession {
self.prewarm_turn_metadata_header(turn_metadata_cwd);
ModelClientSession {
state: Arc::clone(&self.state),
connection: None,
Expand All @@ -149,6 +162,38 @@ impl ModelClient {
turn_state: Arc::new(OnceLock::new()),
}
}

/// Refresh turn metadata in the background and update a cached header that request
/// builders can read without blocking.
fn prewarm_turn_metadata_header(&self, turn_metadata_cwd: Option<PathBuf>) {
let turn_metadata_cwd =
turn_metadata_cwd.map(|cwd| std::fs::canonicalize(&cwd).unwrap_or(cwd));

if let Ok(mut cache) = self.state.turn_metadata_cache.write()
&& cache.cwd != turn_metadata_cwd
{
cache.cwd = turn_metadata_cwd.clone();
cache.header = None;
}

let Some(cwd) = turn_metadata_cwd else {
return;
};
let turn_metadata_cache = Arc::clone(&self.state.turn_metadata_cache);
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let _task = handle.spawn(async move {
let header = build_turn_metadata_header(cwd.as_path())
.await
.and_then(|value| HeaderValue::from_str(value.as_str()).ok());

if let Ok(mut cache) = turn_metadata_cache.write()
&& cache.cwd.as_ref() == Some(&cwd)
{
cache.header = header;
}
});
}
}
}

impl ModelClient {
Expand Down Expand Up @@ -257,6 +302,14 @@ impl ModelClient {
}

impl ModelClientSession {
fn turn_metadata_header(&self) -> Option<HeaderValue> {
self.state
.turn_metadata_cache
.try_read()
.ok()
.and_then(|cache| cache.header.clone())
}

/// Streams a single model turn using either the Responses or Chat
/// Completions wire API, depending on the configured provider.
///
Expand Down Expand Up @@ -332,6 +385,7 @@ impl ModelClientSession {
prompt: &Prompt,
compression: Compression,
) -> ApiResponsesOptions {
let turn_metadata_header = self.turn_metadata_header();
let model_info = &self.state.model_info;

let default_reasoning_effort = model_info.default_reasoning_level;
Expand Down Expand Up @@ -380,7 +434,11 @@ impl ModelClientSession {
store_override: None,
conversation_id: Some(conversation_id),
session_source: Some(self.state.session_source.clone()),
extra_headers: build_responses_headers(&self.state.config, Some(&self.turn_state)),
extra_headers: build_responses_headers(
&self.state.config,
Some(&self.turn_state),
turn_metadata_header.as_ref(),
),
compression,
turn_state: Some(Arc::clone(&self.turn_state)),
}
Expand Down Expand Up @@ -713,6 +771,7 @@ fn experimental_feature_headers(config: &Config) -> ApiHeaderMap {
fn build_responses_headers(
config: &Config,
turn_state: Option<&Arc<OnceLock<String>>>,
turn_metadata_header: Option<&HeaderValue>,
) -> ApiHeaderMap {
let mut headers = experimental_feature_headers(config);
headers.insert(
Expand All @@ -731,6 +790,9 @@ fn build_responses_headers(
{
headers.insert(X_CODEX_TURN_STATE_HEADER, header_value);
}
if let Some(header_value) = turn_metadata_header {
headers.insert(X_CODEX_TURN_METADATA_HEADER, header_value.clone());
}
headers
}

Expand Down
8 changes: 4 additions & 4 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ use crate::error::Result as CodexResult;
use crate::exec::StreamOutput;
use crate::exec_policy::ExecPolicyUpdateError;
use crate::feedback_tags;
use crate::git_info::get_git_repo_root;
use crate::instructions::UserInstructions;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::mcp::auth::compute_auth_statuses;
Expand Down Expand Up @@ -531,7 +532,6 @@ pub(crate) struct TurnContext {
pub(crate) truncation_policy: TruncationPolicy,
pub(crate) dynamic_tools: Vec<DynamicToolSpec>,
}

impl TurnContext {
pub(crate) fn resolve_path(&self, path: Option<String>) -> PathBuf {
path.as_ref()
Expand Down Expand Up @@ -3406,7 +3406,9 @@ pub(crate) async fn run_turn(
// many turns, from the perspective of the user, it is a single turn.
let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));

let mut client_session = turn_context.client.new_session();
let mut client_session = turn_context
.client
.new_session(Some(turn_context.cwd.clone()));

loop {
// Note that pending_input would be something like a message the user
Expand Down Expand Up @@ -4469,8 +4471,6 @@ pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -

#[cfg(test)]
pub(crate) use tests::make_session_and_context;

use crate::git_info::get_git_repo_root;
#[cfg(test)]
pub(crate) use tests::make_session_and_context_with_rx;

Expand Down
4 changes: 3 additions & 1 deletion codex-rs/core/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,9 @@ async fn drain_to_completed(
turn_context: &TurnContext,
prompt: &Prompt,
) -> CodexResult<()> {
let mut client_session = turn_context.client.new_session();
let mut client_session = turn_context
.client
.new_session(Some(turn_context.cwd.clone()));
let mut stream = client_session.stream(prompt).await?;
loop {
let maybe_event = stream.next().await;
Expand Down
1 change: 1 addition & 0 deletions codex-rs/core/src/environment_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl EnvironmentContext {
cwd,
// should compare all fields except shell
shell: _,
..
} = other;

self.cwd == *cwd
Expand Down
76 changes: 71 additions & 5 deletions codex-rs/core/src/git_info.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
Expand Down Expand Up @@ -109,6 +110,73 @@ pub async fn collect_git_info(cwd: &Path) -> Option<GitInfo> {
Some(git_info)
}

/// Collect fetch remotes in a multi-root-friendly format: {"origin": "https://..."}.
pub async fn get_git_remote_urls(cwd: &Path) -> Option<BTreeMap<String, String>> {
let is_git_repo = run_git_command_with_timeout(&["rev-parse", "--git-dir"], cwd)
.await?
.status
.success();
if !is_git_repo {
return None;
}

get_git_remote_urls_assume_git_repo(cwd).await
}

/// Collect fetch remotes without checking whether `cwd` is in a git repo.
pub async fn get_git_remote_urls_assume_git_repo(cwd: &Path) -> Option<BTreeMap<String, String>> {
let output = run_git_command_with_timeout(&["remote", "-v"], cwd).await?;
if !output.status.success() {
return None;
}

let stdout = String::from_utf8(output.stdout).ok()?;
parse_git_remote_urls(stdout.as_str())
}

/// Return the current HEAD commit hash without checking whether `cwd` is in a git repo.
pub async fn get_head_commit_hash(cwd: &Path) -> Option<String> {
let output = run_git_command_with_timeout(&["rev-parse", "HEAD"], cwd).await?;
if !output.status.success() {
return None;
}

let stdout = String::from_utf8(output.stdout).ok()?;
let hash = stdout.trim();
if hash.is_empty() {
None
} else {
Some(hash.to_string())
}
}

fn parse_git_remote_urls(stdout: &str) -> Option<BTreeMap<String, String>> {
let mut remotes = BTreeMap::new();
for line in stdout.lines() {
let Some(fetch_line) = line.strip_suffix(" (fetch)") else {
continue;
};

let Some((name, url_part)) = fetch_line
.split_once('\t')
.or_else(|| fetch_line.split_once(' '))
else {
continue;
};

let url = url_part.trim_start();
if !url.is_empty() {
remotes.insert(name.to_string(), url.to_string());
}
}

if remotes.is_empty() {
None
} else {
Some(remotes)
}
}

/// A minimal commit summary entry used for pickers (subject + timestamp + sha).
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CommitLogEntry {
Expand Down Expand Up @@ -185,11 +253,9 @@ pub async fn git_diff_to_remote(cwd: &Path) -> Option<GitDiffToRemote> {

/// Run a git command with a timeout to prevent blocking on large repositories
async fn run_git_command_with_timeout(args: &[&str], cwd: &Path) -> Option<std::process::Output> {
let result = timeout(
GIT_COMMAND_TIMEOUT,
Command::new("git").args(args).current_dir(cwd).output(),
)
.await;
let mut command = Command::new("git");
command.args(args).current_dir(cwd).kill_on_drop(true);
let result = timeout(GIT_COMMAND_TIMEOUT, command.output()).await;

match result {
Ok(Ok(output)) => Some(output),
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub mod state_db;
pub mod terminal;
mod tools;
pub mod turn_diff_tracker;
mod turn_metadata;
pub use rollout::ARCHIVED_SESSIONS_SUBDIR;
pub use rollout::INTERACTIVE_SESSION_SOURCES;
pub use rollout::RolloutRecorder;
Expand Down Expand Up @@ -131,6 +132,7 @@ pub mod util;

pub use apply_patch::CODEX_APPLY_PATCH_ARG1;
pub use client::WEB_SEARCH_ELIGIBLE_HEADER;
pub use client::X_CODEX_TURN_METADATA_HEADER;
pub use command_safety::is_dangerous_command;
pub use command_safety::is_safe_command;
pub use exec_policy::ExecPolicyError;
Expand Down
43 changes: 43 additions & 0 deletions codex-rs/core/src/turn_metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::collections::BTreeMap;
use std::path::Path;

use serde::Serialize;

use crate::git_info::get_git_remote_urls_assume_git_repo;
use crate::git_info::get_git_repo_root;
use crate::git_info::get_head_commit_hash;

#[derive(Serialize)]
struct TurnMetadataWorkspace {
#[serde(skip_serializing_if = "Option::is_none")]
associated_remote_urls: Option<BTreeMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
latest_git_commit_hash: Option<String>,
}

#[derive(Serialize)]
struct TurnMetadata {
workspaces: BTreeMap<String, TurnMetadataWorkspace>,
}

pub(crate) async fn build_turn_metadata_header(cwd: &Path) -> Option<String> {
let repo_root = get_git_repo_root(cwd)?;

let (latest_git_commit_hash, associated_remote_urls) = tokio::join!(
get_head_commit_hash(cwd),
get_git_remote_urls_assume_git_repo(cwd)
);
if latest_git_commit_hash.is_none() && associated_remote_urls.is_none() {
return None;
}

let mut workspaces = BTreeMap::new();
workspaces.insert(
repo_root.to_string_lossy().into_owned(),
TurnMetadataWorkspace {
associated_remote_urls,
latest_git_commit_hash,
},
);
serde_json::to_string(&TurnMetadata { workspaces }).ok()
}
2 changes: 1 addition & 1 deletion codex-rs/core/tests/chat_completions_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async fn run_request(input: Vec<ResponseItem>) -> Value {
SessionSource::Exec,
TransportManager::new(),
)
.new_session();
.new_session(None);

let mut prompt = Prompt::default();
prompt.input = input;
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/core/tests/chat_completions_sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec<ResponseEvent> {
SessionSource::Exec,
TransportManager::new(),
)
.new_session();
.new_session(None);

let mut prompt = Prompt::default();
prompt.input = vec![ResponseItem::Message {
Expand Down
Loading
Loading