Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 36 additions & 13 deletions crates/zeph-core/src/agent/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ impl<C: Channel> Agent<C> {
let Some(ref budget) = self.context_state.budget else {
return Ok(());
};
let _ = self.channel.send_status("building context...").await;

let system_prompt = self.messages.first().map_or("", |m| m.content.as_str());
let alloc = budget.allocate(system_prompt, &self.skill_state.last_skills_prompt);
Expand All @@ -593,19 +594,37 @@ impl<C: Channel> Agent<C> {

// Fetch all context sources concurrently
#[cfg(not(feature = "index"))]
let (summaries_msg, cross_session_msg, recall_msg) = tokio::try_join!(
Self::fetch_summaries(&self.memory_state, alloc.summaries),
Self::fetch_cross_session(&self.memory_state, &query, alloc.cross_session),
Self::fetch_semantic_recall(&self.memory_state, &query, alloc.semantic_recall),
)?;
let (summaries_msg, cross_session_msg, recall_msg) = {
let result = tokio::try_join!(
Self::fetch_summaries(&self.memory_state, alloc.summaries),
Self::fetch_cross_session(&self.memory_state, &query, alloc.cross_session),
Self::fetch_semantic_recall(&self.memory_state, &query, alloc.semantic_recall),
);
match result {
Ok(v) => v,
Err(e) => {
let _ = self.channel.send_status("").await;
return Err(e);
}
}
};

#[cfg(feature = "index")]
let (summaries_msg, cross_session_msg, recall_msg, code_rag_text) = tokio::try_join!(
Self::fetch_summaries(&self.memory_state, alloc.summaries),
Self::fetch_cross_session(&self.memory_state, &query, alloc.cross_session),
Self::fetch_semantic_recall(&self.memory_state, &query, alloc.semantic_recall),
Self::fetch_code_rag(&self.index, &query, alloc.code_context),
)?;
let (summaries_msg, cross_session_msg, recall_msg, code_rag_text) = {
let result = tokio::try_join!(
Self::fetch_summaries(&self.memory_state, alloc.summaries),
Self::fetch_cross_session(&self.memory_state, &query, alloc.cross_session),
Self::fetch_semantic_recall(&self.memory_state, &query, alloc.semantic_recall),
Self::fetch_code_rag(&self.index, &query, alloc.code_context),
);
match result {
Ok(v) => v,
Err(e) => {
let _ = self.channel.send_status("").await;
return Err(e);
}
}
};

// Insert fetched messages (order: recall, cross-session, summaries at position 1)
if let Some(msg) = recall_msg.filter(|_| self.messages.len() > 1) {
Expand All @@ -626,6 +645,7 @@ impl<C: Channel> Agent<C> {

self.trim_messages_to_budget(alloc.recent_history);
self.recompute_prompt_tokens();
let _ = self.channel.send_status("").await;

Ok(())
}
Expand Down Expand Up @@ -688,6 +708,7 @@ impl<C: Channel> Agent<C> {
let all_meta = self.skill_state.registry.all_meta();
let matched_indices: Vec<usize> = if let Some(matcher) = &self.skill_state.matcher {
let provider = self.provider.clone();
let _ = self.channel.send_status("matching skills...").await;
let scored = matcher
.match_skills(
&all_meta,
Expand All @@ -701,7 +722,7 @@ impl<C: Channel> Agent<C> {
)
.await;

if scored.len() >= 2
let indices: Vec<usize> = if scored.len() >= 2
&& (scored[0].score - scored[1].score) < self.skill_state.disambiguation_threshold
{
match self.disambiguate_skills(query, &all_meta, &scored).await {
Expand All @@ -710,7 +731,9 @@ impl<C: Channel> Agent<C> {
}
} else {
scored.iter().map(|s| s.index).collect()
}
};
let _ = self.channel.send_status("").await;
indices
} else {
(0..all_meta.len()).collect()
};
Expand Down
7 changes: 6 additions & 1 deletion crates/zeph-core/src/agent/learning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,13 @@ impl<C: Channel> Agent<C> {
});

let messages_before = self.messages.len();
let _ = self.channel.send_status("reflecting...").await;
// Box::pin to break async recursion cycle (process_response -> attempt_self_reflection -> process_response)
Box::pin(self.process_response()).await?;
if let Err(e) = Box::pin(self.process_response()).await {
let _ = self.channel.send_status("").await;
return Err(e);
}
let _ = self.channel.send_status("").await;
let retry_succeeded = self.messages.len() > messages_before;

if retry_succeeded {
Expand Down
3 changes: 3 additions & 0 deletions crates/zeph-core/src/agent/mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ impl<C: Channel> Agent<C> {
timeout: std::time::Duration::from_secs(30),
};

let _ = self.channel.send_status("connecting to mcp...").await;
match manager.add_server(&entry).await {
Ok(tools) => {
let _ = self.channel.send_status("").await;
let count = tools.len();
self.mcp.tools.extend(tools);
self.sync_mcp_registry().await;
Expand All @@ -106,6 +108,7 @@ impl<C: Channel> Agent<C> {
Ok(())
}
Err(e) => {
let _ = self.channel.send_status("").await;
tracing::warn!(server_id = entry.id, "MCP add failed: {e:#}");
self.channel
.send(&format!("Failed to connect server '{}': {e}", entry.id))
Expand Down
2 changes: 2 additions & 0 deletions crates/zeph-core/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ impl<C: Channel> Agent<C> {
if new_registry.fingerprint() == self.skill_state.registry.fingerprint() {
return;
}
let _ = self.channel.send_status("reloading skills...").await;
self.skill_state.registry = new_registry;

let all_meta = self.skill_state.registry.all_meta();
Expand Down Expand Up @@ -711,6 +712,7 @@ impl<C: Channel> Agent<C> {
msg.content = system_prompt;
}

let _ = self.channel.send_status("").await;
tracing::info!(
"reloaded {} skill(s)",
self.skill_state.registry.all_meta().len()
Expand Down
3 changes: 3 additions & 0 deletions crates/zeph-core/src/agent/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,19 @@ impl<C: Channel> Agent<C> {
.and_then(|m| serde_json::to_string(&m.parts).ok())
.unwrap_or_else(|| "[]".to_string());

let _ = self.channel.send_status("saving...").await;
let (_message_id, embedding_stored) = match memory
.remember_with_parts(cid, role_str(role), content, &parts_json)
.await
{
Ok(result) => result,
Err(e) => {
let _ = self.channel.send_status("").await;
tracing::error!("failed to persist message: {e:#}");
return;
}
};
let _ = self.channel.send_status("").await;

self.update_metrics(|m| {
m.sqlite_message_count += 1;
Expand Down
7 changes: 7 additions & 0 deletions crates/zeph-core/src/agent/tool_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ impl<C: Channel> Agent<C> {
}
}

let _ = self.channel.send_status("thinking...").await;
let Some(response) = self.call_llm_with_timeout().await? else {
let _ = self.channel.send_status("").await;
return Ok(());
};
let _ = self.channel.send_status("").await;

if response.trim().is_empty() {
tracing::warn!("received empty response from LLM, skipping");
Expand Down Expand Up @@ -140,11 +143,13 @@ impl<C: Channel> Agent<C> {
self.persist_message(Role::Assistant, &response).await;

self.inject_active_skill_env();
let _ = self.channel.send_status("running tool...").await;
let result = self
.tool_executor
.execute_erased(&response)
.instrument(tracing::info_span!("tool_exec"))
.await;
let _ = self.channel.send_status("").await;
self.tool_executor.set_skill_env(None);
if !self.handle_tool_result(&response, result).await? {
return Ok(());
Expand Down Expand Up @@ -577,7 +582,9 @@ impl<C: Channel> Agent<C> {
}
}

let _ = self.channel.send_status("thinking...").await;
let chat_result = self.call_chat_with_tools(&tool_defs).await?;
let _ = self.channel.send_status("").await;

let Some(chat_result) = chat_result else {
tracing::debug!("chat_with_tools returned None (timeout)");
Expand Down
Loading