Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## [Unreleased]

### Added
- Anthropic prompt caching with structured system content blocks (#337)
- Configurable summary provider for tool output summarization via local model (#338)
- Aggressive inline pruning of stale tool outputs in tool loops (#339)
- Cache usage metrics (cache_read_tokens, cache_creation_tokens) in MetricsSnapshot (#340)
- Native tool_use support for Claude provider (Anthropic API format) (#256)
- Native function calling support for OpenAI provider (#257)
- `ToolDefinition`, `ChatResponse`, `ToolUseRequest` types in zeph-llm (#254)
Expand Down
3 changes: 3 additions & 0 deletions config/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
name = "Zeph"
# Maximum tool execution iterations per user message (doom-loop protection)
max_tool_iterations = 10
# Optional local model for tool output summarization and context compaction.
# Format: "ollama/<model>". Falls back to primary provider if unset.
# summary_model = "ollama/llama3.2"

[llm]
# LLM provider: "ollama" for local models or "claude" for Claude API
Expand Down
163 changes: 162 additions & 1 deletion crates/zeph-core/src/agent/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor> Agent<P, C,
);

let summary = self
.provider
.summary_or_primary_provider()
.chat(&[Message {
role: Role::User,
content: compaction_prompt,
Expand Down Expand Up @@ -183,6 +183,58 @@ impl<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor> Agent<P, C,
freed
}

/// Inline pruning for tool loops: clear tool output bodies from messages
/// older than the last `keep_recent` messages. Called after each tool iteration
/// to prevent context growth during long tool loops.
pub(crate) fn prune_stale_tool_outputs(&mut self, keep_recent: usize) -> usize {
if self.messages.len() <= keep_recent + 1 {
return 0;
}
let boundary = self.messages.len().saturating_sub(keep_recent);
let mut freed = 0usize;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
.cast_signed();
// Skip system prompt (index 0), prune from 1..boundary
for msg in &mut self.messages[1..boundary] {
let mut modified = false;
for part in &mut msg.parts {
match part {
MessagePart::ToolOutput {
body, compacted_at, ..
} if compacted_at.is_none() && !body.is_empty() => {
freed += estimate_tokens(body);
*compacted_at = Some(now);
*body = String::new();
modified = true;
}
MessagePart::ToolResult { content, .. } if estimate_tokens(content) > 20 => {
freed += estimate_tokens(content);
"[pruned]".clone_into(content);
freed -= 1;
modified = true;
}
_ => {}
}
}
if modified {
msg.rebuild_content();
}
}
if freed > 0 {
self.update_metrics(|m| m.tool_output_prunes += 1);
tracing::debug!(
freed,
boundary,
keep_recent,
"inline pruned stale tool outputs"
);
}
freed
}

/// Two-tier compaction: Tier 1 prunes tool outputs, Tier 2 falls back to full LLM compaction.
#[allow(
clippy::cast_precision_loss,
Expand Down Expand Up @@ -674,6 +726,9 @@ impl<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor> Agent<P, C,
system_prompt.push_str(&catalog_prompt);
}

system_prompt.push_str("\n<!-- cache:stable -->");
system_prompt.push_str("\n<!-- cache:volatile -->");

#[cfg(feature = "mcp")]
self.append_mcp_prompt(query, &mut system_prompt).await;

Expand Down Expand Up @@ -1505,4 +1560,110 @@ mod tests {
assert!(800 >= threshold); // at threshold → should stop
assert!(799 < threshold); // below threshold → should continue
}

#[test]
fn prune_stale_tool_outputs_clears_old() {
let provider = MockProvider::new(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let (tx, rx) = watch::channel(crate::metrics::MetricsSnapshot::default());

let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_context_budget(10000, 0.20, 0.75, 4, 0)
.with_metrics(tx);

// Add 6 messages with tool outputs
for i in 0..6 {
agent.messages.push(Message::from_parts(
Role::User,
vec![MessagePart::ToolOutput {
tool_name: format!("tool_{i}"),
body: "x".repeat(200),
compacted_at: None,
}],
));
}
// 7 messages total (1 system + 6 user)

let freed = agent.prune_stale_tool_outputs(4);
assert!(freed > 0);
assert_eq!(rx.borrow().tool_output_prunes, 1);

// Messages 1..3 should be pruned (boundary = 7-4=3)
for i in 1..3 {
if let MessagePart::ToolOutput {
body, compacted_at, ..
} = &agent.messages[i].parts[0]
{
assert!(body.is_empty(), "message {i} should be pruned");
assert!(compacted_at.is_some());
}
}
// Messages 3..6 should be untouched
for i in 3..7 {
if let MessagePart::ToolOutput { body, .. } = &agent.messages[i].parts[0] {
assert!(!body.is_empty(), "message {i} should be kept");
}
}
}

#[test]
fn prune_stale_tool_outputs_noop_when_few_messages() {
let provider = MockProvider::new(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();

let mut agent = Agent::new(provider, channel, registry, None, 5, executor);

agent.messages.push(Message::from_parts(
Role::User,
vec![MessagePart::ToolOutput {
tool_name: "bash".into(),
body: "output".into(),
compacted_at: None,
}],
));

let freed = agent.prune_stale_tool_outputs(4);
assert_eq!(freed, 0);
}

#[test]
fn prune_stale_prunes_tool_result_too() {
let provider = MockProvider::new(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();

let mut agent = Agent::new(provider, channel, registry, None, 5, executor);

// Add old message with large ToolResult
agent.messages.push(Message::from_parts(
Role::User,
vec![MessagePart::ToolResult {
tool_use_id: "t1".into(),
content: "x".repeat(500),
is_error: false,
}],
));
// Add 4 recent messages
for _ in 0..4 {
agent.messages.push(Message {
role: Role::User,
content: "recent".into(),
parts: vec![],
});
}

let freed = agent.prune_stale_tool_outputs(4);
assert!(freed > 0);

if let MessagePart::ToolResult { content, .. } = &agent.messages[1].parts[0] {
assert_eq!(content, "[pruned]");
} else {
panic!("expected ToolResult");
}
}
}
22 changes: 22 additions & 0 deletions crates/zeph-core/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::config_watcher::ConfigEvent;
use crate::context::{ContextBudget, EnvironmentContext, build_system_prompt};

const DOOM_LOOP_WINDOW: usize = 3;
const TOOL_LOOP_KEEP_RECENT: usize = 4;
const MAX_QUEUE_SIZE: usize = 10;
const MESSAGE_MERGE_WINDOW: Duration = Duration::from_millis(500);
const RECALL_PREFIX: &str = "[semantic recall]\n";
Expand Down Expand Up @@ -120,6 +121,7 @@ pub struct Agent<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor>
start_time: Instant,
message_queue: VecDeque<QueuedMessage>,
summarize_tool_output_enabled: bool,
summary_provider: Option<P>,
permission_policy: zeph_tools::PermissionPolicy,
warmup_ready: Option<watch::Receiver<bool>>,
max_tool_iterations: usize,
Expand Down Expand Up @@ -209,6 +211,7 @@ impl<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor> Agent<P, C,
start_time: Instant::now(),
message_queue: VecDeque::new(),
summarize_tool_output_enabled: false,
summary_provider: None,
permission_policy: zeph_tools::PermissionPolicy::default(),
warmup_ready: None,
max_tool_iterations: 10,
Expand Down Expand Up @@ -313,6 +316,16 @@ impl<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor> Agent<P, C,
self
}

#[must_use]
pub fn with_summary_provider(mut self, provider: P) -> Self {
self.summary_provider = Some(provider);
self
}

fn summary_or_primary_provider(&self) -> &P {
self.summary_provider.as_ref().unwrap_or(&self.provider)
}

#[must_use]
pub fn with_permission_policy(mut self, policy: zeph_tools::PermissionPolicy) -> Self {
self.permission_policy = policy;
Expand Down Expand Up @@ -417,6 +430,15 @@ impl<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor> Agent<P, C,
}
}

pub(crate) fn record_cache_usage(&self) {
if let Some((creation, read)) = self.provider.last_cache_usage() {
self.update_metrics(|m| {
m.cache_creation_tokens += creation;
m.cache_read_tokens += read;
});
}
}

/// Inject pre-formatted code context into the message list.
/// The caller is responsible for retrieving and formatting the text.
pub fn inject_code_context(&mut self, text: &str) {
Expand Down
13 changes: 11 additions & 2 deletions crates/zeph-core/src/agent/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::channel::Channel;
use crate::redact::redact_secrets;
use zeph_memory::semantic::estimate_tokens;

use super::{Agent, DOOM_LOOP_WINDOW, format_tool_output};
use super::{Agent, DOOM_LOOP_WINDOW, TOOL_LOOP_KEEP_RECENT, format_tool_output};

impl<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor> Agent<P, C, T> {
pub(crate) async fn process_response(&mut self) -> Result<(), super::error::AgentError> {
Expand Down Expand Up @@ -84,6 +84,9 @@ impl<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor> Agent<P, C,
return Ok(());
}

// Prune tool output bodies from older iterations to reduce context growth
self.prune_stale_tool_outputs(TOOL_LOOP_KEEP_RECENT);

// Doom-loop detection: compare last N outputs by string equality
if let Some(last_msg) = self.messages.last() {
self.doom_loop_history.push(last_msg.content.clone());
Expand Down Expand Up @@ -130,6 +133,7 @@ impl<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor> Agent<P, C,
m.prompt_tokens += prompt_estimate;
m.total_tokens = m.prompt_tokens + m.completion_tokens;
});
self.record_cache_usage();
Ok(Some(r?))
} else {
self.channel
Expand All @@ -150,6 +154,7 @@ impl<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor> Agent<P, C,
m.completion_tokens += completion_estimate;
m.total_tokens = m.prompt_tokens + m.completion_tokens;
});
self.record_cache_usage();
let display = self.maybe_redact(&resp);
self.channel.send(&display).await?;
Ok(Some(resp))
Expand Down Expand Up @@ -191,7 +196,7 @@ impl<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor> Agent<P, C,
parts: vec![],
}];

match self.provider.chat(&messages).await {
match self.summary_or_primary_provider().chat(&messages).await {
Ok(summary) => format!("[tool output summary]\n```\n{summary}\n```"),
Err(e) => {
tracing::warn!(
Expand Down Expand Up @@ -428,6 +433,9 @@ impl<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor> Agent<P, C,
self.handle_native_tool_calls(text.as_deref(), &tool_calls)
.await?;

// Prune tool output bodies from older iterations to reduce context growth
self.prune_stale_tool_outputs(TOOL_LOOP_KEEP_RECENT);

if self.check_doom_loop(iteration).await? {
break;
}
Expand Down Expand Up @@ -467,6 +475,7 @@ impl<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor> Agent<P, C,
m.api_calls += 1;
m.last_llm_latency_ms = latency;
});
self.record_cache_usage();

Ok(Some(result))
}
Expand Down
3 changes: 3 additions & 0 deletions crates/zeph-core/src/config/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub struct AgentConfig {
pub name: String,
#[serde(default = "default_max_tool_iterations")]
pub max_tool_iterations: usize,
#[serde(default)]
pub summary_model: Option<String>,
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -612,6 +614,7 @@ impl Config {
agent: AgentConfig {
name: "Zeph".into(),
max_tool_iterations: 10,
summary_model: None,
},
llm: LlmConfig {
provider: "ollama".into(),
Expand Down
2 changes: 2 additions & 0 deletions crates/zeph-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub struct MetricsSnapshot {
pub summaries_count: u64,
pub context_compactions: u64,
pub tool_output_prunes: u64,
pub cache_read_tokens: u64,
pub cache_creation_tokens: u64,
}

pub struct MetricsCollector {
Expand Down
4 changes: 4 additions & 0 deletions crates/zeph-llm/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ impl LlmProvider for AnyProvider {
) -> Result<ChatResponse, crate::LlmError> {
delegate_provider!(self, |p| p.chat_with_tools(messages, tools).await)
}

fn last_cache_usage(&self) -> Option<(u64, u64)> {
delegate_provider!(self, |p| p.last_cache_usage())
}
}

#[cfg(test)]
Expand Down
Loading
Loading