Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions crates/zeph-core/src/agent/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ impl<C: Channel, T: ToolExecutor> Agent<C, T> {
// Process results sequentially (metrics, channel sends, message parts)
let mut result_parts: Vec<MessagePart> = Vec::new();
for (tc, tool_result) in tool_calls.iter().zip(tool_results) {
let (output, is_error, inline_stats) = match tool_result {
let (output, is_error, diff, inline_stats) = match tool_result {
Ok(Some(out)) => {
if let Some(ref fs) = out.filter_stats {
let saved = fs.estimated_tokens_saved() as u64;
Expand Down Expand Up @@ -691,27 +691,28 @@ impl<C: Channel, T: ToolExecutor> Agent<C, T> {
}
});
}
if let Some(diff) = out.diff {
let _ = self.channel.send_diff(diff).await;
}
let inline_stats = out.filter_stats.as_ref().and_then(|fs| {
(fs.filtered_chars < fs.raw_chars).then(|| fs.format_inline(&tc.name))
});
(out.summary, false, inline_stats)
(out.summary, false, out.diff, inline_stats)
}
Ok(None) => ("(no output)".to_owned(), false, None),
Err(e) => (format!("[error] {e}"), true, None),
Ok(None) => ("(no output)".to_owned(), false, None, None),
Err(e) => (format!("[error] {e}"), true, None, None),
};

let processed = self.maybe_summarize_tool_output(&output).await;
let body = if let Some(stats) = inline_stats {
let body = if let Some(ref stats) = inline_stats {
format!("{stats}\n{processed}")
} else {
processed.clone()
};
let formatted = format_tool_output(&tc.name, &body);
let display = self.maybe_redact(&formatted);
self.channel.send(&display).await?;
// Bundle diff and filter stats into a single atomic send so TUI can attach
// them to the tool message without a race between DiffReady and FullMessage.
self.channel
.send_tool_output(&tc.name, &display, diff, inline_stats)
.await?;

result_parts.push(MessagePart::ToolResult {
tool_use_id: tc.id.clone(),
Expand Down
19 changes: 19 additions & 0 deletions crates/zeph-core/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,25 @@ pub trait Channel: Send {
async { Ok(()) }
}

/// Send a complete tool output with optional diff and filter stats atomically.
///
/// The default implementation calls [`Self::send`] with the pre-formatted display text.
/// TUI overrides this to emit a single event that creates the Tool message and attaches
/// diff/filter data without a race between separate sends.
///
/// # Errors
///
/// Returns an error if the underlying I/O fails.
fn send_tool_output(
&mut self,
_tool_name: &str,
display: &str,
_diff: Option<crate::DiffData>,
_filter_stats: Option<String>,
) -> impl Future<Output = Result<(), ChannelError>> + Send {
self.send(display)
}

/// Request user confirmation for a destructive action. Returns `true` if confirmed.
/// Default: auto-confirm (for headless/test scenarios).
///
Expand Down
68 changes: 61 additions & 7 deletions crates/zeph-tui/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,23 +326,37 @@ impl App {
self.scroll_offset = 0;
}
AgentEvent::ToolOutput {
diff, filter_stats, ..
tool_name,
output,
diff,
filter_stats,
..
} => {
if let Some(msg) = self
.messages
.iter_mut()
.rev()
.find(|m| m.role == MessageRole::Tool && m.streaming)
{
// Shell streaming path: finalize existing streaming tool message.
msg.streaming = false;
msg.diff_data = diff;
msg.filter_stats = filter_stats;
} else if filter_stats.is_some()
&& let Some(msg) = self
.messages
.iter_mut()
.rev()
.find(|m| m.role == MessageRole::Tool)
} else if diff.is_some() || filter_stats.is_some() {
// Native tool_use path: no prior ToolStart, create the message now.
self.messages.push(ChatMessage {
role: MessageRole::Tool,
content: output,
streaming: false,
tool_name: Some(tool_name),
diff_data: diff,
filter_stats,
});
} else if let Some(msg) = self
.messages
.iter_mut()
.rev()
.find(|m| m.role == MessageRole::Tool)
{
msg.filter_stats = filter_stats;
}
Expand Down Expand Up @@ -1137,4 +1151,44 @@ mod tests {
assert_eq!(app.messages()[0].tool_name.as_deref(), Some("bash"));
assert_eq!(app.messages()[0].content, "$ echo hello\nhello");
}

#[test]
fn tool_output_without_prior_tool_start_creates_tool_message_with_diff() {
let (mut app, _rx, _tx) = make_app();
let diff = zeph_core::DiffData {
file_path: "src/lib.rs".into(),
old_content: "fn old() {}".into(),
new_content: "fn new() {}".into(),
};
app.handle_agent_event(AgentEvent::ToolOutput {
tool_name: "edit".into(),
command: "[tool output: edit]\n```\nok\n```".into(),
output: "[tool output: edit]\n```\nok\n```".into(),
success: true,
diff: Some(diff),
filter_stats: None,
});

assert_eq!(app.messages().len(), 1);
let msg = &app.messages()[0];
assert_eq!(msg.role, MessageRole::Tool);
assert!(!msg.streaming);
assert!(msg.diff_data.is_some());
}

#[test]
fn tool_output_without_diff_does_not_create_spurious_message() {
let (mut app, _rx, _tx) = make_app();
app.handle_agent_event(AgentEvent::ToolOutput {
tool_name: "read".into(),
command: "[tool output: read]\n```\ncontent\n```".into(),
output: "[tool output: read]\n```\ncontent\n```".into(),
success: true,
diff: None,
filter_stats: None,
});

// No prior ToolStart and no diff/filter_stats: nothing to display.
assert!(app.messages().is_empty());
}
}
59 changes: 59 additions & 0 deletions crates/zeph-tui/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,27 @@ impl Channel for TuiChannel {
Ok(())
}

async fn send_tool_output(
&mut self,
tool_name: &str,
display: &str,
diff: Option<zeph_core::DiffData>,
filter_stats: Option<String>,
) -> Result<(), ChannelError> {
self.agent_event_tx
.send(AgentEvent::ToolOutput {
tool_name: tool_name.to_owned(),
command: display.to_owned(),
output: display.to_owned(),
success: true,
diff,
filter_stats,
})
.await
.map_err(|_| ChannelError::ChannelClosed)?;
Ok(())
}

async fn confirm(&mut self, prompt: &str) -> Result<bool, ChannelError> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.agent_event_tx
Expand Down Expand Up @@ -273,4 +294,42 @@ mod tests {
let debug = format!("{ch:?}");
assert!(debug.contains("TuiChannel"));
}

#[tokio::test]
async fn send_tool_output_bundles_diff_atomically() {
let (mut ch, _user_tx, mut agent_rx) = make_channel();
let diff = zeph_core::DiffData {
file_path: "src/main.rs".into(),
old_content: "old".into(),
new_content: "new".into(),
};
ch.send_tool_output(
"bash",
"[tool output: bash]\n```\nok\n```",
Some(diff),
None,
)
.await
.unwrap();

let evt = agent_rx.recv().await.unwrap();
assert!(
matches!(evt, AgentEvent::ToolOutput { ref tool_name, ref diff, .. } if tool_name == "bash" && diff.is_some()),
"expected ToolOutput with diff"
);
}

#[tokio::test]
async fn send_tool_output_without_diff_sends_tool_event() {
let (mut ch, _user_tx, mut agent_rx) = make_channel();
ch.send_tool_output("read", "[tool output: read]\n```\ncontent\n```", None, None)
.await
.unwrap();

let evt = agent_rx.recv().await.unwrap();
assert!(
matches!(evt, AgentEvent::ToolOutput { ref tool_name, .. } if tool_name == "read"),
"expected ToolOutput"
);
}
}
Loading