Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions crates/zeph-core/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,17 @@ impl<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor> Agent<P, C,
count
}

pub async fn shutdown(&mut self) {
self.channel.send("Shutting down...").await.ok();

#[cfg(feature = "mcp")]
if let Some(ref manager) = self.mcp.manager {
manager.shutdown_all_shared().await;
}

tracing::info!("agent shutdown complete");
}

/// Run the chat loop, receiving messages via the channel until EOF or shutdown.
///
/// # Errors
Expand Down
17 changes: 16 additions & 1 deletion crates/zeph-core/src/agent/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,17 @@ impl<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor> Agent<P, C,
let mut stream = self.provider.chat_stream(&self.messages).await?;
let mut response = String::with_capacity(2048);

while let Some(chunk_result) = stream.next().await {
loop {
let chunk_result = tokio::select! {
item = stream.next() => match item {
Some(r) => r,
None => break,
},
() = super::shutdown_signal(&mut self.shutdown) => {
tracing::info!("streaming interrupted by shutdown");
break;
}
};
let chunk: String = chunk_result?;
response.push_str(&chunk);
let display = self.maybe_redact(&chunk);
Expand Down Expand Up @@ -382,6 +392,11 @@ impl<P: LlmProvider + Clone + 'static, C: Channel, T: ToolExecutor> Agent<P, C,
);

for iteration in 0..self.max_tool_iterations {
if *self.shutdown.borrow() {
tracing::info!("native tool loop interrupted by shutdown");
break;
}

self.channel.send_typing().await?;

if let Some(ref budget) = self.context_state.budget {
Expand Down
14 changes: 12 additions & 2 deletions crates/zeph-mcp/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,23 @@ impl McpManager {
ids
}

/// Graceful shutdown of all connections.
/// Graceful shutdown of all connections (takes ownership).
pub async fn shutdown_all(self) {
self.shutdown_all_shared().await;
}

/// Graceful shutdown of all connections via shared reference.
pub async fn shutdown_all_shared(&self) {
let mut clients = self.clients.write().await;
let drained: Vec<(String, McpClient)> = clients.drain().collect();
for (id, client) in drained {
tracing::info!(server_id = id, "shutting down MCP client");
client.shutdown().await;
if tokio::time::timeout(Duration::from_secs(5), client.shutdown())
.await
.is_err()
{
tracing::warn!(server_id = id, "MCP client shutdown timed out");
}
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,9 +565,11 @@ async fn main() -> anyhow::Result<()> {

tokio::select! {
result = tui_task => {
agent.shutdown().await;
return result?;
}
result = agent_future => {
agent.shutdown().await;
return result;
}
}
Expand All @@ -576,7 +578,9 @@ async fn main() -> anyhow::Result<()> {
warmup_provider(&warmup_provider_clone).await;
tokio::spawn(forward_status_to_stderr(status_rx));
// Box::pin avoids clippy::large_futures on non-TUI path
Box::pin(agent.run()).await
let result = Box::pin(agent.run()).await;
agent.shutdown().await;
result
}

async fn forward_status_to_stderr(mut rx: tokio::sync::mpsc::UnboundedReceiver<String>) {
Expand Down
Loading