Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- Blocked command matching extracts basename from absolute paths (`/usr/bin/sudo` now correctly blocked)
- Transparent wrapper commands (`env`, `command`, `exec`, `nice`, `nohup`, `time`, `xargs`) are skipped to detect the actual command
- Default confirm patterns now include `$(` and backtick subshell expressions
- Enable SQLite WAL mode with SYNCHRONOUS=NORMAL for 2-5x write throughput (#639)
- Replace O(n*iterations) token scan with cached_prompt_tokens in budget checks (#640)
- Defer maybe_redact to stream completion boundary instead of per-chunk (#641)
- Replace format_tool_output string allocation with Write-into-buffer (#642)
- Change ToolCall.params from HashMap to serde_json::Map, eliminating clone (#643)
- Pre-join static system prompt sections into LazyLock<String> (#644)
- Replace doom-loop string history with content hash comparison (#645)
- Return &'static str from detect_image_mime with case-insensitive matching (#646)
- Replace block_on in history persist with fire-and-forget async spawn (#647)

### Fixed
- False positive: "sudoku" no longer matched by "sudo" blocked pattern (word-boundary matching)
Expand Down
91 changes: 70 additions & 21 deletions crates/zeph-core/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,34 @@ const MAX_AUDIO_BYTES: usize = 25 * 1024 * 1024;
const MAX_IMAGE_BYTES: usize = 20 * 1024 * 1024;

fn format_tool_output(tool_name: &str, body: &str) -> String {
format!("[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}")
use std::fmt::Write;
let capacity = "[tool output: ".len()
+ tool_name.len()
+ "]\n```\n".len()
+ body.len()
+ TOOL_OUTPUT_SUFFIX.len();
let mut buf = String::with_capacity(capacity);
let _ = write!(
buf,
"[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
);
buf
}

fn detect_image_mime(filename: Option<&str>) -> String {
fn detect_image_mime(filename: Option<&str>) -> &'static str {
let ext = filename
.and_then(|f| std::path::Path::new(f).extension())
.and_then(|e| e.to_str())
.unwrap_or("")
.to_lowercase();
match ext.as_str() {
"jpg" | "jpeg" => "image/jpeg",
"gif" => "image/gif",
"webp" => "image/webp",
_ => "image/png",
}
.to_owned()
.unwrap_or("");
if ext.eq_ignore_ascii_case("jpg") || ext.eq_ignore_ascii_case("jpeg") {
"image/jpeg"
} else if ext.eq_ignore_ascii_case("gif") {
"image/gif"
} else if ext.eq_ignore_ascii_case("webp") {
"image/webp"
} else {
"image/png"
}
}

struct QueuedMessage {
Expand Down Expand Up @@ -153,7 +165,7 @@ pub struct Agent<C: Channel, T: ToolExecutor> {
message_queue: VecDeque<QueuedMessage>,
summary_provider: Option<AnyProvider>,
warmup_ready: Option<watch::Receiver<bool>>,
doom_loop_history: Vec<String>,
doom_loop_history: Vec<u64>,
cost_tracker: Option<CostTracker>,
cached_prompt_tokens: u64,
stt: Option<Box<dyn SpeechToText>>,
Expand Down Expand Up @@ -787,7 +799,7 @@ impl<C: Channel, T: ToolExecutor> Agent<C, T> {
);
continue;
}
let mime_type = detect_image_mime(attachment.filename.as_deref());
let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
image_parts.push(MessagePart::Image {
data: attachment.data,
mime_type,
Expand Down Expand Up @@ -918,7 +930,7 @@ impl<C: Channel, T: ToolExecutor> Agent<C, T> {
.await?;
return Ok(());
}
let mime_type = detect_image_mime(Some(path));
let mime_type = detect_image_mime(Some(path)).to_string();
extra_parts.push(MessagePart::Image { data, mime_type });
self.channel
.send(&format!("Image loaded: {path}. Send your message."))
Expand Down Expand Up @@ -2190,23 +2202,60 @@ pub(super) mod agent_tests {

#[test]
fn doom_loop_detection_triggers_on_identical_outputs() {
let s = "same output".to_owned();
let history = vec![s.clone(), s.clone(), s];
// doom_loop_history stores u64 hashes — identical content produces equal hashes
let h = 42u64;
let history: Vec<u64> = vec![h, h, h];
let recent = &history[history.len() - DOOM_LOOP_WINDOW..];
assert!(recent.windows(2).all(|w| w[0] == w[1]));
}

#[test]
fn doom_loop_detection_no_trigger_on_different_outputs() {
let history = vec![
"output a".to_owned(),
"output b".to_owned(),
"output c".to_owned(),
];
let history: Vec<u64> = vec![1, 2, 3];
let recent = &history[history.len() - DOOM_LOOP_WINDOW..];
assert!(!recent.windows(2).all(|w| w[0] == w[1]));
}

#[test]
fn format_tool_output_structure() {
let out = format_tool_output("bash", "hello world");
assert!(out.starts_with("[tool output: bash]\n```\n"));
assert!(out.ends_with(TOOL_OUTPUT_SUFFIX));
assert!(out.contains("hello world"));
}

#[test]
fn format_tool_output_empty_body() {
let out = format_tool_output("grep", "");
assert_eq!(out, "[tool output: grep]\n```\n\n```");
}

#[test]
fn detect_image_mime_standard() {
assert_eq!(detect_image_mime(Some("photo.jpg")), "image/jpeg");
assert_eq!(detect_image_mime(Some("photo.jpeg")), "image/jpeg");
assert_eq!(detect_image_mime(Some("anim.gif")), "image/gif");
assert_eq!(detect_image_mime(Some("img.webp")), "image/webp");
assert_eq!(detect_image_mime(Some("img.png")), "image/png");
assert_eq!(detect_image_mime(None), "image/png");
}

#[test]
fn detect_image_mime_uppercase() {
assert_eq!(detect_image_mime(Some("photo.JPG")), "image/jpeg");
assert_eq!(detect_image_mime(Some("photo.JPEG")), "image/jpeg");
assert_eq!(detect_image_mime(Some("anim.GIF")), "image/gif");
assert_eq!(detect_image_mime(Some("img.WEBP")), "image/webp");
}

#[test]
fn detect_image_mime_mixed_case() {
assert_eq!(detect_image_mime(Some("photo.Jpg")), "image/jpeg");
assert_eq!(detect_image_mime(Some("photo.JpEg")), "image/jpeg");
assert_eq!(detect_image_mime(Some("anim.Gif")), "image/gif");
assert_eq!(detect_image_mime(Some("img.WebP")), "image/webp");
}

#[tokio::test]
async fn cancel_signal_propagates_to_fresh_token() {
use tokio_util::sync::CancellationToken;
Expand Down
51 changes: 27 additions & 24 deletions crates/zeph-core/src/agent/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,23 @@ use tokio_stream::StreamExt;
use zeph_llm::provider::{ChatResponse, LlmProvider, Message, MessagePart, Role, ToolDefinition};
use zeph_tools::executor::{ToolCall, ToolError, ToolExecutor, ToolOutput};

use super::{Agent, DOOM_LOOP_WINDOW, TOOL_LOOP_KEEP_RECENT, format_tool_output};
use crate::channel::Channel;
use crate::redact::redact_secrets;
use zeph_memory::semantic::estimate_tokens;

use super::{Agent, DOOM_LOOP_WINDOW, TOOL_LOOP_KEEP_RECENT, format_tool_output};
use tracing::Instrument;

/// Strip volatile IDs from message content so doom-loop comparison is stable.
/// Normalizes `[tool_result: <id>]` and `[tool_use: <name>(<id>)]` by removing unique IDs.
// DefaultHasher output is not stable across Rust versions — do not persist or serialize
// these hashes. They are used only for within-session equality comparison.
fn doom_loop_hash(content: &str) -> u64 {
use std::hash::{DefaultHasher, Hash, Hasher};
let normalized = normalize_for_doom_loop(content);
let mut hasher = DefaultHasher::new();
normalized.hash(&mut hasher);
hasher.finish()
}

fn normalize_for_doom_loop(content: &str) -> String {
let mut out = String::with_capacity(content.len());
let mut rest = content;
Expand Down Expand Up @@ -86,11 +94,7 @@ impl<C: Channel, T: ToolExecutor> Agent<C, T> {

// Context budget check at 80% threshold
if let Some(ref budget) = self.context_state.budget {
let used: usize = self
.messages
.iter()
.map(|m| estimate_tokens(&m.content))
.sum();
let used = usize::try_from(self.cached_prompt_tokens).unwrap_or(usize::MAX);
let threshold = budget.max_tokens() * 4 / 5;
if used >= threshold {
tracing::warn!(
Expand Down Expand Up @@ -147,10 +151,10 @@ impl<C: Channel, T: ToolExecutor> Agent<C, T> {
// Prune tool output bodies from older iterations to reduce context growth
self.prune_stale_tool_outputs(TOOL_LOOP_KEEP_RECENT);

// Doom-loop detection: compare last N outputs by string equality
// Doom-loop detection: compare last N outputs by content hash
if let Some(last_msg) = self.messages.last() {
self.doom_loop_history
.push(normalize_for_doom_loop(&last_msg.content));
.push(doom_loop_hash(&last_msg.content));
if self.doom_loop_history.len() >= DOOM_LOOP_WINDOW {
let recent =
&self.doom_loop_history[self.doom_loop_history.len() - DOOM_LOOP_WINDOW..];
Expand Down Expand Up @@ -218,7 +222,12 @@ impl<C: Channel, T: ToolExecutor> Agent<C, T> {
});
self.record_cache_usage();
self.record_cost(prompt_estimate, completion_estimate_for_cost);
Ok(Some(r?))
let raw = r?;
// Redact secrets from the full response before it is persisted to history.
// Streaming chunks were already sent to the channel without per-chunk redaction
// (acceptable trade-off: ephemeral display vs allocation per chunk).
let redacted = self.maybe_redact(&raw).into_owned();
Ok(Some(redacted))
} else {
self.channel
.send("LLM request timed out. Please try again.")
Expand Down Expand Up @@ -492,8 +501,7 @@ impl<C: Channel, T: ToolExecutor> Agent<C, T> {
};
let chunk: String = chunk_result?;
response.push_str(&chunk);
let display = self.maybe_redact(&chunk);
self.channel.send_chunk(&display).await?;
self.channel.send_chunk(&chunk).await?;
}

self.channel.flush_chunks().await?;
Expand Down Expand Up @@ -549,11 +557,7 @@ impl<C: Channel, T: ToolExecutor> Agent<C, T> {
self.channel.send_typing().await?;

if let Some(ref budget) = self.context_state.budget {
let used: usize = self
.messages
.iter()
.map(|m| estimate_tokens(&m.content))
.sum();
let used = usize::try_from(self.cached_prompt_tokens).unwrap_or(usize::MAX);
let threshold = budget.max_tokens() * 4 / 5;
if used >= threshold {
tracing::warn!(
Expand Down Expand Up @@ -716,11 +720,11 @@ impl<C: Channel, T: ToolExecutor> Agent<C, T> {
let calls: Vec<ToolCall> = tool_calls
.iter()
.map(|tc| {
let params: std::collections::HashMap<String, serde_json::Value> =
let params: serde_json::Map<String, serde_json::Value> =
if let serde_json::Value::Object(map) = &tc.input {
map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
map.clone()
} else {
std::collections::HashMap::new()
serde_json::Map::new()
};
ToolCall {
tool_id: tc.name.clone(),
Expand Down Expand Up @@ -844,7 +848,7 @@ impl<C: Channel, T: ToolExecutor> Agent<C, T> {
) -> Result<bool, super::error::AgentError> {
if let Some(last_msg) = self.messages.last() {
self.doom_loop_history
.push(normalize_for_doom_loop(&last_msg.content));
.push(doom_loop_hash(&last_msg.content));
if self.doom_loop_history.len() >= DOOM_LOOP_WINDOW {
let recent =
&self.doom_loop_history[self.doom_loop_history.len() - DOOM_LOOP_WINDOW..];
Expand Down Expand Up @@ -879,7 +883,6 @@ fn tool_def_to_definition(def: &zeph_tools::registry::ToolDef) -> ToolDefinition

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -1044,7 +1047,7 @@ mod tests {
(0..n)
.map(|i| ToolCall {
tool_id: format!("tool-{i}"),
params: HashMap::new(),
params: serde_json::Map::new(),
})
.collect()
}
Expand Down
43 changes: 35 additions & 8 deletions crates/zeph-core/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::LazyLock;

use zeph_memory::semantic::estimate_tokens;

const BASE_PROMPT_HEADER: &str = "\
Expand Down Expand Up @@ -44,22 +46,47 @@ the user explicitly asks about a skill by name.\n\
- Do not force-push to main/master branches.\n\
- Do not execute commands that could cause data loss without confirmation.";

static PROMPT_LEGACY: LazyLock<String> = LazyLock::new(|| {
let mut s = String::with_capacity(
BASE_PROMPT_HEADER.len() + TOOL_USE_LEGACY.len() + BASE_PROMPT_TAIL.len(),
);
s.push_str(BASE_PROMPT_HEADER);
s.push_str(TOOL_USE_LEGACY);
s.push_str(BASE_PROMPT_TAIL);
s
});

static PROMPT_NATIVE: LazyLock<String> = LazyLock::new(|| {
let mut s = String::with_capacity(
BASE_PROMPT_HEADER.len() + TOOL_USE_NATIVE.len() + BASE_PROMPT_TAIL.len(),
);
s.push_str(BASE_PROMPT_HEADER);
s.push_str(TOOL_USE_NATIVE);
s.push_str(BASE_PROMPT_TAIL);
s
});

#[must_use]
pub fn build_system_prompt(
skills_prompt: &str,
env: Option<&EnvironmentContext>,
tool_catalog: Option<&str>,
native_tools: bool,
) -> String {
let mut prompt = BASE_PROMPT_HEADER.to_string();

if native_tools {
prompt.push_str(TOOL_USE_NATIVE);
let base = if native_tools {
&*PROMPT_NATIVE
} else {
prompt.push_str(TOOL_USE_LEGACY);
}

prompt.push_str(BASE_PROMPT_TAIL);
&*PROMPT_LEGACY
};
let dynamic_len = env.map_or(0, |e| e.format().len() + 2)
+ tool_catalog.map_or(0, |c| if c.is_empty() { 0 } else { c.len() + 2 })
+ if skills_prompt.is_empty() {
0
} else {
skills_prompt.len() + 2
};
let mut prompt = String::with_capacity(base.len() + dynamic_len);
prompt.push_str(base);

if let Some(env) = env {
prompt.push_str("\n\n");
Expand Down
25 changes: 24 additions & 1 deletion crates/zeph-memory/src/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ impl SqliteStore {

let opts = SqliteConnectOptions::from_str(&url)?
.create_if_missing(true)
.foreign_keys(true);
.foreign_keys(true)
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal);

let pool = SqlitePoolOptions::new()
.max_connections(5)
Expand Down Expand Up @@ -65,3 +67,24 @@ impl SqliteStore {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use tempfile::NamedTempFile;

#[tokio::test]
async fn wal_journal_mode_enabled_on_file_db() {
let file = NamedTempFile::new().expect("tempfile");
let path = file.path().to_str().expect("valid path");

let store = SqliteStore::new(path).await.expect("SqliteStore::new");

let mode: String = sqlx::query_scalar("PRAGMA journal_mode")
.fetch_one(store.pool())
.await
.expect("PRAGMA query");

assert_eq!(mode, "wal", "expected WAL journal mode, got: {mode}");
}
}
Loading
Loading