Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 2 additions & 35 deletions .github/workflows/shell-tool-mcp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -248,37 +248,8 @@ jobs:
run: |
set -euo pipefail
if command -v apt-get >/dev/null 2>&1; then
# On Ubuntu ARM containers, apt metadata for universe/multiverse is large and
# can make `apt-get update` very slow. We only need packages from `main`, so
# trim sources to `main` for faster, more reliable updates.
if [[ "${{ matrix.target }}" == "aarch64-unknown-linux-musl" ]] && [[ -f /etc/os-release ]]; then
. /etc/os-release
if [[ "${ID-}" == "ubuntu" ]]; then
if [[ -f /etc/apt/sources.list.d/ubuntu.sources ]]; then
# Ubuntu 24.04+ uses deb822 sources; trim components to main.
sed -i -E 's/^Components:.*/Components: main/' /etc/apt/sources.list.d/ubuntu.sources
else
codename="${VERSION_CODENAME:-}"
if [[ -n "$codename" ]]; then
printf '%s\n' \
"deb http://ports.ubuntu.com/ubuntu-ports ${codename} main" \
"deb http://ports.ubuntu.com/ubuntu-ports ${codename}-updates main" \
"deb http://ports.ubuntu.com/ubuntu-ports ${codename}-backports main" \
"deb http://ports.ubuntu.com/ubuntu-ports ${codename}-security main" \
>/etc/apt/sources.list
fi
fi
fi
fi

apt_opts=(
-o Acquire::Retries=3
-o Acquire::http::Timeout=30
-o Acquire::https::Timeout=30
-o Acquire::Languages=none
)
apt-get "${apt_opts[@]}" update
DEBIAN_FRONTEND=noninteractive apt-get "${apt_opts[@]}" install -y git build-essential bison autoconf gettext
apt-get update
DEBIAN_FRONTEND=noninteractive apt-get install -y git build-essential bison autoconf gettext
elif command -v dnf >/dev/null 2>&1; then
dnf install -y git gcc gcc-c++ make bison autoconf gettext
elif command -v yum >/dev/null 2>&1; then
Expand Down Expand Up @@ -480,10 +451,6 @@ jobs:
registry-url: https://registry.npmjs.org
scope: "@openai"

# Trusted publishing requires npm CLI version 11.5.1 or later.
- name: Update npm
run: npm install -g npm@latest

- name: Download npm tarball
uses: actions/download-artifact@v7
with:
Expand Down
80 changes: 80 additions & 0 deletions codex-rs/core/tests/suite/sqlite_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@ use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::UserMessageEvent;
use codex_state::STATE_DB_FILENAME;
use core_test_support::load_sse_fixture_with_id;
use core_test_support::responses;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex::test_codex;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::fs;
use tokio::time::Duration;
use tracing_subscriber::prelude::*;
use uuid::Uuid;

fn sse_completed(id: &str) -> String {
Expand Down Expand Up @@ -197,3 +203,77 @@ async fn user_messages_persist_in_state_db() -> Result<()> {

Ok(())
}

#[tokio::test(flavor = "current_thread")]
async fn tool_call_logs_include_thread_id() -> Result<()> {
let server = start_mock_server().await;
let call_id = "call-1";
let args = json!({
"command": "echo hello",
"timeout_ms": 1_000,
"login": false,
});
let args_json = serde_json::to_string(&args)?;
mount_sse_sequence(
&server,
vec![
responses::sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "shell_command", &args_json),
ev_completed("resp-1"),
]),
responses::sse(vec![ev_completed("resp-2")]),
],
)
.await;

let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::Sqlite);
});
let test = builder.build(&server).await?;
let db = test.codex.state_db().expect("state db enabled");
let expected_thread_id = test.session_configured.session_id.to_string();

let subscriber = tracing_subscriber::registry().with(codex_state::log_db::start(db.clone()));
let dispatch = tracing::Dispatch::new(subscriber);
let _guard = tracing::dispatcher::set_default(&dispatch);

test.submit_turn("run a shell command").await?;
{
let span = tracing::info_span!("test_log_span", thread_id = %expected_thread_id);
let _entered = span.enter();
tracing::info!("ToolCall: shell_command {{\"command\":\"echo hello\"}}");
}

let mut found = None;
for _ in 0..80 {
let query = codex_state::LogQuery {
descending: true,
limit: Some(20),
..Default::default()
};
let rows = db.query_logs(&query).await?;
if let Some(row) = rows.into_iter().find(|row| {
row.message
.as_deref()
.is_some_and(|m| m.starts_with("ToolCall:"))
}) {
let thread_id = row.thread_id;
let message = row.message;
found = Some((thread_id, message));
break;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}

let (thread_id, message) = found.expect("expected ToolCall log row");
assert_eq!(thread_id, Some(expected_thread_id));
assert!(
message
.as_deref()
.is_some_and(|text| text.starts_with("ToolCall:")),
"expected ToolCall message, got {message:?}"
);

Ok(())
}
2 changes: 0 additions & 2 deletions codex-rs/state/migrations/0002_logs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ CREATE TABLE logs (
level TEXT NOT NULL,
target TEXT NOT NULL,
message TEXT,
thread_id TEXT,
module_path TEXT,
file TEXT,
line INTEGER
);

CREATE INDEX idx_logs_ts ON logs(ts DESC, ts_nanos DESC, id DESC);
CREATE INDEX idx_logs_thread_id ON logs(thread_id);
3 changes: 3 additions & 0 deletions codex-rs/state/migrations/0003_logs_thread_id.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE logs ADD COLUMN thread_id TEXT;

CREATE INDEX idx_logs_thread_id ON logs(thread_id);
144 changes: 42 additions & 102 deletions codex-rs/state/src/bin/logs_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;

Expand All @@ -7,15 +6,12 @@ use chrono::DateTime;
use chrono::SecondsFormat;
use chrono::Utc;
use clap::Parser;
use codex_state::LogQuery;
use codex_state::LogRow;
use codex_state::STATE_DB_FILENAME;
use codex_state::StateRuntime;
use dirs::home_dir;
use owo_colors::OwoColorize;
use sqlx::QueryBuilder;
use sqlx::Row;
use sqlx::Sqlite;
use sqlx::SqlitePool;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::sqlite::SqlitePoolOptions;

#[derive(Debug, Parser)]
#[command(name = "codex-state-logs")]
Expand Down Expand Up @@ -62,18 +58,6 @@ struct Args {
poll_ms: u64,
}

#[derive(Debug, Clone, sqlx::FromRow)]
struct LogRow {
id: i64,
ts: i64,
ts_nanos: i64,
level: String,
message: Option<String>,
thread_id: Option<String>,
file: Option<String>,
line: Option<i64>,
}

#[derive(Debug, Clone)]
struct LogFilter {
level_upper: Option<String>,
Expand All @@ -89,16 +73,20 @@ async fn main() -> anyhow::Result<()> {
let args = Args::parse();
let db_path = resolve_db_path(&args)?;
let filter = build_filter(&args)?;
let pool = open_read_only_pool(db_path.as_path()).await?;
let codex_home = db_path
.parent()
.map(ToOwned::to_owned)
.unwrap_or_else(|| PathBuf::from("."));
let runtime = StateRuntime::init(codex_home, "logs-client".to_string(), None).await?;

let mut last_id = print_backfill(&pool, &filter, args.backfill).await?;
let mut last_id = print_backfill(runtime.as_ref(), &filter, args.backfill).await?;
if last_id == 0 {
last_id = fetch_max_id(&pool, &filter).await?;
last_id = fetch_max_id(runtime.as_ref(), &filter).await?;
}

let poll_interval = Duration::from_millis(args.poll_ms);
loop {
let rows = fetch_new_rows(&pool, &filter, last_id).await?;
let rows = fetch_new_rows(runtime.as_ref(), &filter, last_id).await?;
for row in rows {
last_id = last_id.max(row.id);
println!("{}", format_row(&row));
Expand Down Expand Up @@ -159,31 +147,16 @@ fn parse_timestamp(value: &str) -> anyhow::Result<i64> {
Ok(dt.timestamp())
}

async fn open_read_only_pool(path: &Path) -> anyhow::Result<SqlitePool> {
let options = SqliteConnectOptions::new()
.filename(path)
.create_if_missing(false)
.read_only(true)
.busy_timeout(Duration::from_secs(5));

let display = path.display();
SqlitePoolOptions::new()
.max_connections(1)
.connect_with(options)
.await
.with_context(|| format!("failed to open sqlite db at {display}"))
}

async fn print_backfill(
pool: &SqlitePool,
runtime: &StateRuntime,
filter: &LogFilter,
backfill: usize,
) -> anyhow::Result<i64> {
if backfill == 0 {
return Ok(0);
}

let mut rows = fetch_backfill(pool, filter, backfill).await?;
let mut rows = fetch_backfill(runtime, filter, backfill).await?;
rows.reverse();

let mut last_id = 0;
Expand All @@ -195,86 +168,53 @@ async fn print_backfill(
}

async fn fetch_backfill(
pool: &SqlitePool,
runtime: &StateRuntime,
filter: &LogFilter,
backfill: usize,
) -> anyhow::Result<Vec<LogRow>> {
let mut builder = base_select_builder();
push_filters(&mut builder, filter);
builder.push(" ORDER BY id DESC");
builder.push(" LIMIT ").push_bind(backfill as i64);

builder
.build_query_as::<LogRow>()
.fetch_all(pool)
let query = to_log_query(filter, Some(backfill), None, true);
runtime
.query_logs(&query)
.await
.context("failed to fetch backfill logs")
}

async fn fetch_new_rows(
pool: &SqlitePool,
runtime: &StateRuntime,
filter: &LogFilter,
last_id: i64,
) -> anyhow::Result<Vec<LogRow>> {
let mut builder = base_select_builder();
push_filters(&mut builder, filter);
builder.push(" AND id > ").push_bind(last_id);
builder.push(" ORDER BY id ASC");

builder
.build_query_as::<LogRow>()
.fetch_all(pool)
let query = to_log_query(filter, None, Some(last_id), false);
runtime
.query_logs(&query)
.await
.context("failed to fetch new logs")
}

async fn fetch_max_id(pool: &SqlitePool, filter: &LogFilter) -> anyhow::Result<i64> {
let mut builder = QueryBuilder::<Sqlite>::new("SELECT MAX(id) AS max_id FROM logs WHERE 1 = 1");
push_filters(&mut builder, filter);

let row = builder
.build()
.fetch_one(pool)
async fn fetch_max_id(runtime: &StateRuntime, filter: &LogFilter) -> anyhow::Result<i64> {
let query = to_log_query(filter, None, None, false);
runtime
.max_log_id(&query)
.await
.context("failed to fetch max log id")?;
let max_id: Option<i64> = row.try_get("max_id")?;
Ok(max_id.unwrap_or(0))
.context("failed to fetch max log id")
}

fn base_select_builder<'a>() -> QueryBuilder<'a, Sqlite> {
QueryBuilder::<Sqlite>::new(
"SELECT id, ts, ts_nanos, level, message, thread_id, file, line FROM logs WHERE 1 = 1",
)
}

fn push_filters<'a>(builder: &mut QueryBuilder<'a, Sqlite>, filter: &'a LogFilter) {
if let Some(level_upper) = filter.level_upper.as_ref() {
builder
.push(" AND UPPER(level) = ")
.push_bind(level_upper.as_str());
}
if let Some(from_ts) = filter.from_ts {
builder.push(" AND ts >= ").push_bind(from_ts);
}
if let Some(to_ts) = filter.to_ts {
builder.push(" AND ts <= ").push_bind(to_ts);
}
if let Some(module_like) = filter.module_like.as_ref() {
builder
.push(" AND module_path LIKE '%' || ")
.push_bind(module_like.as_str())
.push(" || '%'");
}
if let Some(file_like) = filter.file_like.as_ref() {
builder
.push(" AND file LIKE '%' || ")
.push_bind(file_like.as_str())
.push(" || '%'");
}
if let Some(thread_id) = filter.thread_id.as_ref() {
builder
.push(" AND thread_id = ")
.push_bind(thread_id.as_str());
fn to_log_query(
filter: &LogFilter,
limit: Option<usize>,
after_id: Option<i64>,
descending: bool,
) -> LogQuery {
LogQuery {
level_upper: filter.level_upper.clone(),
from_ts: filter.from_ts,
to_ts: filter.to_ts,
module_like: filter.module_like.clone(),
file_like: filter.file_like.clone(),
thread_id: filter.thread_id.clone(),
after_id,
limit,
descending,
}
}

Expand Down
2 changes: 2 additions & 0 deletions codex-rs/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ mod paths;
mod runtime;

pub use model::LogEntry;
pub use model::LogQuery;
pub use model::LogRow;
/// Preferred entrypoint: owns configuration and metrics.
pub use runtime::StateRuntime;

Expand Down
Loading
Loading