diff --git a/Cargo.lock b/Cargo.lock index 55d3e248c182..9e0bfa5c8889 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,12 +30,13 @@ dependencies = [ [[package]] name = "agent-client-protocol" -version = "0.1.1" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b91e5ec3ce05e8effb2a7a3b7b1a587daa6699b9f98bbde6a35e44b8c6c773a" +checksum = "cc2526e80463b9742afed4829aedd6ae5632d6db778c6cc1fecb80c960c3521b" dependencies = [ "anyhow", "async-broadcast", + "async-trait", "futures", "log", "parking_lot", diff --git a/crates/goose-cli/Cargo.toml b/crates/goose-cli/Cargo.toml index cd76ec44cd19..e7e3e0e20a74 100644 --- a/crates/goose-cli/Cargo.toml +++ b/crates/goose-cli/Cargo.toml @@ -22,7 +22,7 @@ mcp-client = { path = "../mcp-client" } mcp-server = { path = "../mcp-server" } mcp-core = { path = "../mcp-core" } rmcp = { workspace = true } -agent-client-protocol = "0.1.1" +agent-client-protocol = "0.4.0" clap = { version = "4.4", features = ["derive"] } cliclack = "0.3.5" console = "0.15.8" diff --git a/crates/goose-cli/src/commands/acp.rs b/crates/goose-cli/src/commands/acp.rs index 8e0cac5a50c9..157d074b9511 100644 --- a/crates/goose-cli/src/commands/acp.rs +++ b/crates/goose-cli/src/commands/acp.rs @@ -1,10 +1,14 @@ -use agent_client_protocol::{self as acp, Client, SessionNotification}; +use agent_client_protocol::{ + self as acp, Client, EmbeddedResource, ImageContent, SessionNotification, TextContent, + ToolCallContent, +}; use anyhow::Result; use goose::agents::Agent; use goose::config::{Config, ExtensionConfigManager}; use goose::conversation::message::{Message, MessageContent}; use goose::conversation::Conversation; use goose::providers::create; +use rmcp::model::{RawContent, ResourceContents}; use std::collections::{HashMap, HashSet}; use std::fs; use std::sync::Arc; @@ -17,7 +21,6 @@ use url::Url; /// Represents a single Goose session for ACP struct GooseSession { - agent: Agent, messages: Conversation, tool_call_ids: HashMap, // Maps internal tool IDs to ACP tool call IDs cancel_token: Option, // Active cancellation token for prompt processing @@ -27,7 +30,7 @@ struct GooseSession { struct GooseAcpAgent { session_update_tx: mpsc::UnboundedSender<(acp::SessionNotification, oneshot::Sender<()>)>, sessions: Arc>>, - provider: Arc, + agent: Agent, // Shared agent instance } fn read_resource_link(link: acp::ResourceLink) -> Option { @@ -72,66 +75,13 @@ impl GooseAcpAgent { }; let provider = create(&provider_name, model_config)?; - Ok(Self { - session_update_tx, - sessions: Arc::new(Mutex::new(HashMap::new())), - provider, - }) - } -} - -impl acp::Agent for GooseAcpAgent { - async fn initialize( - &self, - arguments: acp::InitializeRequest, - ) -> Result { - info!("ACP: Received initialize request {:?}", arguments); - - // Advertise Goose's capabilities - let agent_capabilities = acp::AgentCapabilities { - load_session: false, // TODO: Implement session persistence - prompt_capabilities: acp::PromptCapabilities { - image: true, // Goose supports image inputs via providers - audio: false, // TODO: Add audio support when providers support it - embedded_context: true, // Goose can handle embedded context resources - }, - }; - - Ok(acp::InitializeResponse { - protocol_version: acp::V1, - agent_capabilities, - auth_methods: Vec::new(), - }) - } - - async fn authenticate(&self, arguments: acp::AuthenticateRequest) -> Result<(), acp::Error> { - info!("ACP: Received authenticate request {:?}", arguments); - Ok(()) - } - - async fn new_session( - &self, - arguments: acp::NewSessionRequest, - ) -> Result { - info!("ACP: Received new session request {:?}", arguments); - - // Generate a unique session ID - let session_id = uuid::Uuid::new_v4().to_string(); - - // Create a new Agent and session for this ACP session + // Create a shared agent instance let agent = Agent::new(); - agent - .update_provider(self.provider.clone()) - .await - .map_err(|_| acp::Error::internal_error())?; + agent.update_provider(provider.clone()).await?; // Load and add extensions just like the normal CLI - // Get all enabled extensions from configuration let extensions_to_run: Vec<_> = ExtensionConfigManager::get_all() - .map_err(|e| { - error!("Failed to load extensions: {}", e); - acp::Error::internal_error() - })? + .map_err(|e| anyhow::anyhow!("Failed to load extensions: {}", e))? .into_iter() .filter(|ext| ext.enabled) .map(|ext| ext.config) @@ -171,65 +121,21 @@ impl acp::Agent for GooseAcpAgent { } // Unwrap the Arc to get the agent back - let agent = Arc::try_unwrap(agent_ptr).map_err(|_| { - error!("Failed to unwrap agent Arc"); - acp::Error::internal_error() - })?; + let agent = Arc::try_unwrap(agent_ptr) + .map_err(|_| anyhow::anyhow!("Failed to unwrap agent Arc"))?; - let session = GooseSession { + Ok(Self { + session_update_tx, + sessions: Arc::new(Mutex::new(HashMap::new())), agent, - messages: Conversation::new_unvalidated(Vec::new()), - tool_call_ids: HashMap::new(), - cancel_token: None, - }; - - // Store the session - let mut sessions = self.sessions.lock().await; - sessions.insert(session_id.clone(), session); - - info!("Created new session with ID: {}", session_id); - - Ok(acp::NewSessionResponse { - session_id: acp::SessionId(session_id.into()), }) } - async fn load_session(&self, arguments: acp::LoadSessionRequest) -> Result<(), acp::Error> { - info!("ACP: Received load session request {:?}", arguments); - // For now, will start a new session. We could use goose session storage as an enhancement - // we would need to map ACP session IDs to goose session ids (which by default are auto generated) - // normal goose session restore in CLI doesn't load conversation visually. - // - // Example flow: - // - Load session file by session_id (might need to map ACP session IDs to Goose session paths) - // - For each message in history: - // - If user message: send user_message_chunk notification - // - If assistant message: send agent_message_chunk notification - // - If tool calls/responses: send appropriate notifications - - // For now, we don't support loading previous sessions - Err(acp::Error::method_not_found()) - } - - #[allow(clippy::too_many_lines)] - async fn prompt( - &self, - arguments: acp::PromptRequest, - ) -> Result { - info!("ACP: Received prompt request {:?}", arguments); - - // Get the session - let session_id = arguments.session_id.0.to_string(); - let mut sessions = self.sessions.lock().await; - let session = sessions - .get_mut(&session_id) - .ok_or_else(acp::Error::invalid_params)?; - - // Convert ACP prompt to Goose message + fn convert_acp_prompt_to_message(&self, prompt: Vec) -> Message { let mut user_message = Message::user(); // Process all content blocks from the prompt - for block in arguments.prompt { + for block in prompt { match block { acp::ContentBlock::Text(text) => { user_message = user_message.with_text(&text.text); @@ -261,17 +167,358 @@ impl acp::Agent for GooseAcpAgent { } } - // Add message to conversation - session.messages.push(user_message); + user_message + } + + async fn handle_message_content( + &self, + content_item: &MessageContent, + session_id: &acp::SessionId, + session: &mut GooseSession, + ) -> Result<(), acp::Error> { + match content_item { + MessageContent::Text(text) => { + // Stream text to the client + let (tx, rx) = oneshot::channel(); + self.session_update_tx + .send(( + SessionNotification { + session_id: session_id.clone(), + update: acp::SessionUpdate::AgentMessageChunk { + content: text.text.clone().into(), + }, + meta: None, + }, + tx, + )) + .map_err(|_| acp::Error::internal_error())?; + rx.await.map_err(|_| acp::Error::internal_error())?; + } + MessageContent::ToolRequest(tool_request) => { + self.handle_tool_request(tool_request, session_id, session) + .await?; + } + MessageContent::ToolResponse(tool_response) => { + self.handle_tool_response(tool_response, session_id, session) + .await?; + } + MessageContent::Thinking(thinking) => { + // Stream thinking/reasoning content as thought chunks + let (tx, rx) = oneshot::channel(); + self.session_update_tx + .send(( + SessionNotification { + session_id: session_id.clone(), + update: acp::SessionUpdate::AgentThoughtChunk { + content: thinking.thinking.clone().into(), + }, + meta: None, + }, + tx, + )) + .map_err(|_| acp::Error::internal_error())?; + rx.await.map_err(|_| acp::Error::internal_error())?; + } + _ => { + // Ignore other content types for now + } + } + Ok(()) + } + + async fn handle_tool_request( + &self, + tool_request: &goose::conversation::message::ToolRequest, + session_id: &acp::SessionId, + session: &mut GooseSession, + ) -> Result<(), acp::Error> { + // Generate ACP tool call ID and track mapping + let acp_tool_id = format!("tool_{}", uuid::Uuid::new_v4()); + session + .tool_call_ids + .insert(tool_request.id.clone(), acp_tool_id.clone()); + + // Extract tool name and parameters from the ToolCall if successful + let (tool_name, locations) = match &tool_request.tool_call { + Ok(tool_call) => { + let name = tool_call.name.clone(); + + // Extract file locations from certain tools for client tracking + let mut locs = Vec::new(); + if name == "developer__text_editor" { + // Try to extract the path from the arguments + let args = &tool_call.arguments; + if let Some(path_str) = args.get("path").and_then(|p| p.as_str()) { + locs.push(acp::ToolCallLocation { + path: path_str.into(), + line: Some(1), + meta: None, + }); + } + } + (name, locs) + } + Err(_) => ("unknown".to_string(), Vec::new()), + }; + + // Send tool call notification + let (tx, rx) = oneshot::channel(); + self.session_update_tx + .send(( + SessionNotification { + session_id: session_id.clone(), + update: acp::SessionUpdate::ToolCall(acp::ToolCall { + id: acp::ToolCallId(acp_tool_id.clone().into()), + title: format!("Calling tool: {}", tool_name), + kind: acp::ToolKind::default(), + status: acp::ToolCallStatus::Pending, + content: Vec::new(), + locations, + raw_input: None, + raw_output: None, + meta: None, + }), + meta: None, + }, + tx, + )) + .map_err(|_| acp::Error::internal_error())?; + rx.await.map_err(|_| acp::Error::internal_error())?; + + Ok(()) + } + + async fn handle_tool_response( + &self, + tool_response: &goose::conversation::message::ToolResponse, + session_id: &acp::SessionId, + session: &mut GooseSession, + ) -> Result<(), acp::Error> { + // Look up the ACP tool call ID + if let Some(acp_tool_id) = session.tool_call_ids.get(&tool_response.id) { + // Determine if the tool call succeeded or failed + let status = if tool_response.tool_result.is_ok() { + acp::ToolCallStatus::Completed + } else { + acp::ToolCallStatus::Failed + }; + + let content: Vec = match &tool_response.tool_result { + Ok(content_items) => content_items + .iter() + .filter_map(|content| match &content.raw { + RawContent::Text(val) => Some(ToolCallContent::Content { + content: acp::ContentBlock::Text(TextContent { + annotations: None, + text: val.text.clone(), + meta: None, + }), + }), + RawContent::Image(val) => Some(ToolCallContent::Content { + content: acp::ContentBlock::Image(ImageContent { + annotations: None, + data: val.data.clone(), + mime_type: val.mime_type.clone(), + uri: None, + meta: None, + }), + }), + RawContent::Resource(val) => Some(ToolCallContent::Content { + content: acp::ContentBlock::Resource(EmbeddedResource { + annotations: None, + resource: match &val.resource { + ResourceContents::TextResourceContents { + mime_type, + text, + uri, + .. + } => acp::EmbeddedResourceResource::TextResourceContents( + acp::TextResourceContents { + mime_type: mime_type.clone(), + text: text.clone(), + uri: uri.clone(), + meta: None, + }, + ), + ResourceContents::BlobResourceContents { + mime_type, + blob, + uri, + .. + } => acp::EmbeddedResourceResource::BlobResourceContents( + acp::BlobResourceContents { + mime_type: mime_type.clone(), + blob: blob.clone(), + uri: uri.clone(), + meta: None, + }, + ), + }, + meta: None, + }), + }), + RawContent::Audio(_) => { + // Audio content is not supported in ACP ContentBlock, skip it + None + } + RawContent::ResourceLink(_) => { + // ResourceLink content is not supported in ACP ContentBlock, skip it + None + } + }) + .collect(), + Err(_) => Vec::new(), + }; + + // Send status update (completed or failed) + let (tx, rx) = oneshot::channel(); + self.session_update_tx + .send(( + SessionNotification { + session_id: session_id.clone(), + update: acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate { + id: acp::ToolCallId(acp_tool_id.clone().into()), + fields: acp::ToolCallUpdateFields { + status: Some(status), + content: Some(content), + ..Default::default() + }, + meta: None, + }), + meta: None, + }, + tx, + )) + .map_err(|_| acp::Error::internal_error())?; + rx.await.map_err(|_| acp::Error::internal_error())?; + } + + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl acp::Agent for GooseAcpAgent { + async fn initialize( + &self, + args: acp::InitializeRequest, + ) -> Result { + info!("ACP: Received initialize request {:?}", args); + + // Advertise Goose's capabilities + let agent_capabilities = acp::AgentCapabilities { + load_session: false, // TODO: Implement session persistence + prompt_capabilities: acp::PromptCapabilities { + image: true, // Goose supports image inputs via providers + audio: false, // TODO: Add audio support when providers support it + embedded_context: true, // Goose can handle embedded context resources + meta: None, + }, + mcp_capabilities: acp::McpCapabilities { + http: false, // TODO: Add MCP HTTP support if needed + sse: false, // TODO: Add MCP SSE support if needed + meta: None, + }, + meta: None, + }; + + Ok(acp::InitializeResponse { + protocol_version: acp::V1, + agent_capabilities, + auth_methods: Vec::new(), + meta: None, + }) + } + + async fn authenticate( + &self, + args: acp::AuthenticateRequest, + ) -> Result { + info!("ACP: Received authenticate request {:?}", args); + Ok(acp::AuthenticateResponse { meta: None }) + } + + async fn new_session( + &self, + args: acp::NewSessionRequest, + ) -> Result { + info!("ACP: Received new session request {:?}", args); + + // Generate a unique session ID + let session_id = uuid::Uuid::new_v4().to_string(); + + let session = GooseSession { + messages: Conversation::new_unvalidated(Vec::new()), + tool_call_ids: HashMap::new(), + cancel_token: None, + }; + + // Store the session + let mut sessions = self.sessions.lock().await; + sessions.insert(session_id.clone(), session); + + info!("Created new session with ID: {}", session_id); + + Ok(acp::NewSessionResponse { + session_id: acp::SessionId(session_id.into()), + modes: None, // TODO: Implement session modes if needed + meta: None, + }) + } + + async fn load_session( + &self, + args: acp::LoadSessionRequest, + ) -> Result { + info!("ACP: Received load session request {:?}", args); + // For now, will start a new session. We could use goose session storage as an enhancement + // we would need to map ACP session IDs to goose session ids (which by default are auto generated) + // normal goose session restore in CLI doesn't load conversation visually. + // + // Example flow: + // - Load session file by session_id (might need to map ACP session IDs to Goose session paths) + // - For each message in history: + // - If user message: send user_message_chunk notification + // - If assistant message: send agent_message_chunk notification + // - If tool calls/responses: send appropriate notifications + + // For now, we don't support loading previous sessions + Err(acp::Error::method_not_found()) + } + + async fn prompt(&self, args: acp::PromptRequest) -> Result { + info!("ACP: Received prompt request {:?}", args); + + // Get the session + let session_id = args.session_id.0.to_string(); // Create and store cancellation token for this prompt let cancel_token = CancellationToken::new(); - session.cancel_token = Some(cancel_token.clone()); + + // Convert ACP prompt to Goose message + let user_message = self.convert_acp_prompt_to_message(args.prompt); + + // Prepare for agent reply + let messages = { + let mut sessions = self.sessions.lock().await; + let session = sessions + .get_mut(&session_id) + .ok_or_else(acp::Error::invalid_params)?; + + // Add message to conversation + session.messages.push(user_message); + + // Store cancellation token + session.cancel_token = Some(cancel_token.clone()); + + // Clone what we need for the reply call + session.messages.clone() + }; // Get agent's reply through the Goose agent - let mut stream = session + let mut stream = self .agent - .reply(session.messages.clone(), None, Some(cancel_token.clone())) + .reply(messages, None, Some(cancel_token.clone())) .await .map_err(|e| { error!("Error getting agent reply: {}", e); @@ -293,139 +540,19 @@ impl acp::Agent for GooseAcpAgent { match event { Ok(goose::agents::AgentEvent::Message(message)) => { + // Re-acquire the lock to add message to conversation + let mut sessions = self.sessions.lock().await; + let session = sessions + .get_mut(&session_id) + .ok_or_else(acp::Error::invalid_params)?; + // Add to conversation session.messages.push(message.clone()); // Process message content, including tool calls for content_item in &message.content { - match content_item { - MessageContent::Text(text) => { - // Stream text to the client - let (tx, rx) = oneshot::channel(); - self.session_update_tx - .send(( - SessionNotification { - session_id: arguments.session_id.clone(), - update: acp::SessionUpdate::AgentMessageChunk { - content: text.text.clone().into(), - }, - }, - tx, - )) - .map_err(|_| acp::Error::internal_error())?; - rx.await.map_err(|_| acp::Error::internal_error())?; - } - MessageContent::ToolRequest(tool_request) => { - // Generate ACP tool call ID and track mapping - let acp_tool_id = format!("tool_{}", uuid::Uuid::new_v4()); - session - .tool_call_ids - .insert(tool_request.id.clone(), acp_tool_id.clone()); - - // Extract tool name and parameters from the ToolCall if successful - let (tool_name, locations) = match &tool_request.tool_call { - Ok(tool_call) => { - let name = tool_call.name.clone(); - - // Extract file locations from certain tools for client tracking - let mut locs = Vec::new(); - if name == "developer__text_editor" { - // Try to extract the path from the arguments - let args = &tool_call.arguments; - if let Some(path_str) = - args.get("path").and_then(|p| p.as_str()) - { - locs.push(acp::ToolCallLocation { - path: path_str.into(), - line: Some(1), - }); - } - } - (name, locs) - } - Err(_) => ("unknown".to_string(), Vec::new()), - }; - - // Send tool call notification - let (tx, rx) = oneshot::channel(); - self.session_update_tx - .send(( - SessionNotification { - session_id: arguments.session_id.clone(), - update: acp::SessionUpdate::ToolCall(acp::ToolCall { - id: acp::ToolCallId(acp_tool_id.clone().into()), - title: format!("Calling tool: {}", tool_name), - kind: acp::ToolKind::default(), - status: acp::ToolCallStatus::Pending, - content: Vec::new(), - locations, - raw_input: None, - raw_output: None, - }), - }, - tx, - )) - .map_err(|_| acp::Error::internal_error())?; - rx.await.map_err(|_| acp::Error::internal_error())?; - - // No need for separate update - status is already set to Pending - } - MessageContent::ToolResponse(tool_response) => { - // Look up the ACP tool call ID - if let Some(acp_tool_id) = - session.tool_call_ids.get(&tool_response.id) - { - // Determine if the tool call succeeded or failed - let status = if tool_response.tool_result.is_ok() { - acp::ToolCallStatus::Completed - } else { - acp::ToolCallStatus::Failed - }; - - // Send status update (completed or failed) - let (tx, rx) = oneshot::channel(); - self.session_update_tx - .send(( - SessionNotification { - session_id: arguments.session_id.clone(), - update: acp::SessionUpdate::ToolCallUpdate( - acp::ToolCallUpdate { - id: acp::ToolCallId( - acp_tool_id.clone().into(), - ), - fields: acp::ToolCallUpdateFields { - status: Some(status), - ..Default::default() - }, - }, - ), - }, - tx, - )) - .map_err(|_| acp::Error::internal_error())?; - rx.await.map_err(|_| acp::Error::internal_error())?; - } - } - MessageContent::Thinking(thinking) => { - // Stream thinking/reasoning content as thought chunks - let (tx, rx) = oneshot::channel(); - self.session_update_tx - .send(( - SessionNotification { - session_id: arguments.session_id.clone(), - update: acp::SessionUpdate::AgentThoughtChunk { - content: thinking.thinking.clone().into(), - }, - }, - tx, - )) - .map_err(|_| acp::Error::internal_error())?; - rx.await.map_err(|_| acp::Error::internal_error())?; - } - _ => { - // Ignore other content types for now - } - } + self.handle_message_content(content_item, &args.session_id, session) + .await?; } } Ok(_) => { @@ -439,7 +566,10 @@ impl acp::Agent for GooseAcpAgent { } // Clear the cancel token since we're done - session.cancel_token = None; + let mut sessions = self.sessions.lock().await; + if let Some(session) = sessions.get_mut(&session_id) { + session.cancel_token = None; + } Ok(acp::PromptResponse { stop_reason: if was_cancelled { @@ -447,6 +577,7 @@ impl acp::Agent for GooseAcpAgent { } else { acp::StopReason::EndTurn }, + meta: None, }) } @@ -468,6 +599,27 @@ impl acp::Agent for GooseAcpAgent { Ok(()) } + + async fn set_session_mode( + &self, + _args: acp::SetSessionModeRequest, + ) -> Result { + // TODO: Implement session modes if needed + Err(acp::Error::method_not_found()) + } + + async fn ext_method( + &self, + _args: acp::ExtRequest, + ) -> Result, acp::Error> { + // TODO: Implement extension methods if needed + Err(acp::Error::method_not_found()) + } + + async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> { + // TODO: Implement extension notifications if needed + Ok(()) + } } /// Run the ACP agent server @@ -540,6 +692,7 @@ mod tests { size: None, title: None, uri: format!("file://{}", file.path().to_str().unwrap()), + meta: None, }; Ok((link, file)) }