Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions crates/goose-cli/src/commands/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use axum::{
};
use futures::{sink::SinkExt, stream::StreamExt};
use goose::agents::{Agent, AgentEvent};
use goose::context_mgmt::auto_compact::check_and_compact_messages;
use goose::message::Message as GooseMessage;
use goose::session;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -455,6 +456,45 @@ async fn process_message_streaming(
session_msgs.clone()
};

// Check and compact messages if needed before calling reply
let compact_result = check_and_compact_messages(agent, &messages, None).await?;
if compact_result.compacted {
messages = compact_result.messages.clone();

// Update session messages
{
let mut session_msgs = session_messages.lock().await;
*session_msgs = compact_result.messages;
}

// Notify client of compaction
let msg = if let (Some(before), Some(after)) =
(compact_result.tokens_before, compact_result.tokens_after)
{
format!(
"Auto-compacted context: {} → {} tokens ({:.0}% reduction)",
before,
after,
(1.0 - (after as f64 / before as f64)) * 100.0
)
} else {
"Auto-compacted context to prevent overflow".to_string()
};

let mut sender_lock = sender.lock().await;
let _ = sender_lock
.send(Message::Text(
serde_json::to_string(&WebSocketMessage::Response {
content: msg,
role: "system".to_string(),
timestamp: chrono::Utc::now().timestamp_millis(),
})
.unwrap()
.into(),
))
.await;
}

// Persist messages to JSONL file with provider for automatic description generation
let provider = agent.provider().await;
if provider.is_err() {
Expand Down
36 changes: 36 additions & 0 deletions crates/goose-cli/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub use self::export::message_to_markdown;
pub use builder::{build_session, SessionBuilderConfig, SessionSettings};
use console::Color;
use goose::agents::AgentEvent;
use goose::context_mgmt::auto_compact::check_and_compact_messages;
use goose::message::push_message;
use goose::permission::permission_confirmation::PrincipalType;
use goose::permission::Permission;
Expand Down Expand Up @@ -840,6 +841,41 @@ impl Session {
}

async fn process_agent_response(&mut self, interactive: bool) -> Result<()> {
// Check and compact messages if needed before calling reply
let compact_result = check_and_compact_messages(&self.agent, &self.messages, None).await?;
if compact_result.compacted {
self.messages = compact_result.messages;

// Notify user of compaction
let msg = if let (Some(before), Some(after)) =
(compact_result.tokens_before, compact_result.tokens_after)
{
format!(
"Auto-compacted context: {} → {} tokens ({:.0}% reduction)",
before,
after,
(1.0 - (after as f64 / before as f64)) * 100.0
)
} else {
"Auto-compacted context to prevent overflow".to_string()
};
output::render_text(&msg, Some(Color::Yellow), true);

// Persist the compacted messages
if let Some(session_file) = &self.session_file {
let working_dir = std::env::current_dir().ok();
let provider = self.agent.provider().await.ok();
session::persist_messages_with_schedule_id(
session_file,
&self.messages,
provider,
self.scheduled_job_id.clone(),
working_dir,
)
.await?;
}
}

let cancel_token = CancellationToken::new();
let cancel_token_clone = cancel_token.clone();

Expand Down
38 changes: 37 additions & 1 deletion crates/goose-server/src/routes/reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use bytes::Bytes;
use futures::{stream::StreamExt, Stream};
use goose::{
agents::{AgentEvent, SessionConfig},
context_mgmt::auto_compact::check_and_compact_messages,
message::{push_message, Message},
permission::permission_confirmation::PrincipalType,
};
Expand Down Expand Up @@ -159,8 +160,43 @@ async fn reply_handler(
retry_config: None,
};

// Check and compact messages if needed before calling reply
let mut messages_to_process = messages.clone();
let compact_result = check_and_compact_messages(&agent, &messages_to_process, None).await;
if let Ok(result) = compact_result {
if result.compacted {
messages_to_process = result.messages;

// Notify client of compaction
let msg = if let (Some(before), Some(after)) =
(result.tokens_before, result.tokens_after)
{
format!(
"Auto-compacted context: {} → {} tokens ({:.0}% reduction)",
before,
after,
(1.0 - (after as f64 / before as f64)) * 100.0
)
} else {
"Auto-compacted context to prevent overflow".to_string()
};

let _ = stream_event(
MessageEvent::Message {
message: Message::assistant().with_text(&msg),
},
&task_tx,
)
.await;
}
}

let mut stream = match agent
.reply(&messages, Some(session_config), Some(task_cancel.clone()))
.reply(
&messages_to_process,
Some(session_config),
Some(task_cancel.clone()),
)
.await
{
Ok(stream) => stream,
Expand Down
6 changes: 5 additions & 1 deletion crates/goose/src/agents/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ impl Agent {
&self,
tool_call: mcp_core::tool::ToolCall,
request_id: String,
cancellation_token: Option<CancellationToken>,
) -> (String, Result<ToolCallResult, ToolError>) {
// Check if this tool call should be allowed based on repetition monitoring
if let Some(monitor) = self.tool_monitor.lock().await.as_mut() {
Expand Down Expand Up @@ -345,10 +346,12 @@ impl Agent {

let task_config =
TaskConfig::new(provider, Some(Arc::clone(&self.extension_manager)), mcp_tx);

subagent_execute_task_tool::run_tasks(
tool_call.arguments.clone(),
task_config,
&self.tasks_manager,
cancellation_token,
)
.await
} else if tool_call.name == DYNAMIC_TASK_TOOL_NAME_PREFIX {
Expand Down Expand Up @@ -914,7 +917,7 @@ impl Agent {
for request in &permission_check_result.approved {
if let Ok(tool_call) = request.tool_call.clone() {
let (req_id, tool_result) = self
.dispatch_tool_call(tool_call, request.id.clone())
.dispatch_tool_call(tool_call, request.id.clone(), cancel_token.clone())
.await;

tool_futures.push((
Expand Down Expand Up @@ -951,6 +954,7 @@ impl Agent {
tool_futures_arc.clone(),
&mut permission_manager,
message_tool_response.clone(),
cancel_token.clone(),
);

while let Some(msg) = tool_approval_stream.try_next().await? {
Expand Down
11 changes: 0 additions & 11 deletions crates/goose/src/agents/sub_recipe_execution_tool/mod.rs

This file was deleted.

186 changes: 0 additions & 186 deletions crates/goose/src/agents/sub_recipe_execution_tool/tasks.rs

This file was deleted.

Loading