From cb27b4a7eae7e4a510f5421f785721b4b372bbf6 Mon Sep 17 00:00:00 2001 From: Douwe Osinga Date: Sun, 28 Sep 2025 21:23:26 -0400 Subject: [PATCH 1/4] Don't set agent props twice --- crates/goose/src/execution/manager.rs | 49 +++++++++++++-------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/crates/goose/src/execution/manager.rs b/crates/goose/src/execution/manager.rs index 3ef38237e0d5..22b8bb319e89 100644 --- a/crates/goose/src/execution/manager.rs +++ b/crates/goose/src/execution/manager.rs @@ -91,43 +91,40 @@ impl AgentManager { session_id: String, mode: SessionExecutionMode, ) -> Result> { - let agent = { + let (agent, is_new) = { let mut sessions = self.sessions.write().await; - if let Some(agent) = sessions.get(&session_id) { - debug!("Found existing agent for session {}", session_id); - return Ok(Arc::clone(agent)); + if let Some(existing) = sessions.get(&session_id) { + (Arc::clone(existing), false) + } else { + let agent = Arc::new(Agent::new()); + sessions.put(session_id.clone(), agent.clone()); + (agent, true) } - - info!( - "Creating new agent for session {} with mode {}", - session_id, mode - ); - let agent = Arc::new(Agent::new()); - sessions.put(session_id.clone(), Arc::clone(&agent)); - agent }; - match &mode { - SessionExecutionMode::Interactive | SessionExecutionMode::Background => { - debug!("Setting scheduler on agent for session {}", session_id); - agent.set_scheduler(Arc::clone(&self.scheduler)).await; + if is_new { + match &mode { + SessionExecutionMode::Interactive | SessionExecutionMode::Background => { + debug!("Setting scheduler on agent for session {}", session_id); + agent.set_scheduler(Arc::clone(&self.scheduler)).await; + } + SessionExecutionMode::SubTask { .. } => { + debug!( + "SubTask mode for session {}, skipping scheduler setup", + session_id + ); + } } - SessionExecutionMode::SubTask { .. } => { + + if let Some(provider) = &*self.default_provider.read().await { debug!( - "SubTask mode for session {}, skipping scheduler setup", + "Setting default provider on agent for session {}", session_id ); + let _ = agent.update_provider(Arc::clone(provider)).await; } } - if let Some(provider) = &*self.default_provider.read().await { - debug!( - "Setting default provider on agent for session {}", - session_id - ); - let _ = agent.update_provider(Arc::clone(provider)).await; - } - Ok(agent) } From 17bc6cc7937dcbc04489abec0478fd9f97016cf8 Mon Sep 17 00:00:00 2001 From: Douwe Osinga Date: Tue, 30 Sep 2025 08:37:52 -0400 Subject: [PATCH 2/4] What Tyler said --- crates/goose-server/src/routes/reply.rs | 5 +- crates/goose-server/src/state.rs | 21 ++-- crates/goose/src/execution/manager.rs | 49 +++------ crates/goose/tests/execution_tests.rs | 136 +++--------------------- 4 files changed, 34 insertions(+), 177 deletions(-) diff --git a/crates/goose-server/src/routes/reply.rs b/crates/goose-server/src/routes/reply.rs index 09c1c41bff63..fca3f568170a 100644 --- a/crates/goose-server/src/routes/reply.rs +++ b/crates/goose-server/src/routes/reply.rs @@ -207,10 +207,7 @@ async fn reply_handler( let task_tx = tx.clone(); drop(tokio::spawn(async move { - let agent = match state - .get_agent(session_id.clone(), SessionExecutionMode::Interactive) - .await - { + let agent = match state.get_agent(session_id.clone()).await { Ok(agent) => agent, Err(e) => { tracing::error!("Failed to get session agent: {}", e); diff --git a/crates/goose-server/src/state.rs b/crates/goose-server/src/state.rs index 8d5f5d24cb06..b1b07517ff87 100644 --- a/crates/goose-server/src/state.rs +++ b/crates/goose-server/src/state.rs @@ -1,6 +1,5 @@ use axum::http::StatusCode; use goose::execution::manager::AgentManager; -use goose::execution::SessionExecutionMode; use goose::scheduler_trait::SchedulerTrait; use std::collections::{HashMap, HashSet}; use std::path::PathBuf; @@ -46,14 +45,8 @@ impl AppState { } } - pub async fn get_agent( - &self, - session_id: String, - mode: SessionExecutionMode, - ) -> anyhow::Result> { - self.agent_manager - .get_or_create_agent(session_id, mode) - .await + pub async fn get_agent(&self, session_id: String) -> anyhow::Result> { + self.agent_manager.get_or_create_agent(session_id).await } /// Get agent for route handlers - always uses Interactive mode and converts any error to 500 @@ -61,11 +54,9 @@ impl AppState { &self, session_id: String, ) -> Result, StatusCode> { - self.get_agent(session_id, SessionExecutionMode::Interactive) - .await - .map_err(|e| { - tracing::error!("Failed to get agent: {}", e); - StatusCode::INTERNAL_SERVER_ERROR - }) + self.get_agent(session_id).await.map_err(|e| { + tracing::error!("Failed to get agent: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + }) } } diff --git a/crates/goose/src/execution/manager.rs b/crates/goose/src/execution/manager.rs index 22b8bb319e89..1ac53348f80d 100644 --- a/crates/goose/src/execution/manager.rs +++ b/crates/goose/src/execution/manager.rs @@ -86,46 +86,27 @@ impl AgentManager { Ok(()) } - pub async fn get_or_create_agent( - &self, - session_id: String, - mode: SessionExecutionMode, - ) -> Result> { - let (agent, is_new) = { + pub async fn get_or_create_agent(&self, session_id: String) -> Result> { + { let mut sessions = self.sessions.write().await; if let Some(existing) = sessions.get(&session_id) { - (Arc::clone(existing), false) - } else { - let agent = Arc::new(Agent::new()); - sessions.put(session_id.clone(), agent.clone()); - (agent, true) - } - }; - - if is_new { - match &mode { - SessionExecutionMode::Interactive | SessionExecutionMode::Background => { - debug!("Setting scheduler on agent for session {}", session_id); - agent.set_scheduler(Arc::clone(&self.scheduler)).await; - } - SessionExecutionMode::SubTask { .. } => { - debug!( - "SubTask mode for session {}, skipping scheduler setup", - session_id - ); - } + return Ok(Arc::clone(existing)); } + } - if let Some(provider) = &*self.default_provider.read().await { - debug!( - "Setting default provider on agent for session {}", - session_id - ); - let _ = agent.update_provider(Arc::clone(provider)).await; - } + let agent = Arc::new(Agent::new()); + agent.set_scheduler(Arc::clone(&self.scheduler)).await; + if let Some(provider) = &*self.default_provider.read().await { + agent.update_provider(Arc::clone(provider)).await?; } - Ok(agent) + let mut sessions = self.sessions.write().await; + if let Some(existing) = sessions.get(&session_id) { + Ok(Arc::clone(existing)) + } else { + sessions.put(session_id, agent.clone()); + Ok(agent) + } } pub async fn remove_session(&self, session_id: &str) -> Result<()> { diff --git a/crates/goose/tests/execution_tests.rs b/crates/goose/tests/execution_tests.rs index bb918f573782..1084103baa4b 100644 --- a/crates/goose/tests/execution_tests.rs +++ b/crates/goose/tests/execution_tests.rs @@ -31,24 +31,15 @@ mod execution_tests { let session1 = uuid::Uuid::new_v4().to_string(); let session2 = uuid::Uuid::new_v4().to_string(); - let agent1 = manager - .get_or_create_agent(session1.clone(), SessionExecutionMode::Interactive) - .await - .unwrap(); + let agent1 = manager.get_or_create_agent(session1.clone()).await.unwrap(); - let agent2 = manager - .get_or_create_agent(session2.clone(), SessionExecutionMode::Interactive) - .await - .unwrap(); + let agent2 = manager.get_or_create_agent(session2.clone()).await.unwrap(); // Different sessions should have different agents assert!(!Arc::ptr_eq(&agent1, &agent2)); // Getting the same session should return the same agent - let agent1_again = manager - .get_or_create_agent(session1, SessionExecutionMode::chat()) - .await - .unwrap(); + let agent1_again = manager.get_or_create_agent(session1).await.unwrap(); assert!(Arc::ptr_eq(&agent1, &agent1_again)); } @@ -60,18 +51,12 @@ mod execution_tests { let sessions: Vec<_> = (0..3).map(|i| format!("session-{}", i)).collect(); for session in &sessions { - manager - .get_or_create_agent(session.clone(), SessionExecutionMode::chat()) - .await - .unwrap(); + manager.get_or_create_agent(session.clone()).await.unwrap(); } // Create a new session after cleanup let new_session = "new-session".to_string(); - let _new_agent = manager - .get_or_create_agent(new_session, SessionExecutionMode::chat()) - .await - .unwrap(); + let _new_agent = manager.get_or_create_agent(new_session).await.unwrap(); assert_eq!(manager.session_count().await, 3); assert!(!manager.has_session(&sessions[0]).await); @@ -82,10 +67,7 @@ mod execution_tests { let manager = AgentManager::new(None).await.unwrap(); let session = String::from("remove-test"); - manager - .get_or_create_agent(session.clone(), SessionExecutionMode::chat()) - .await - .unwrap(); + manager.get_or_create_agent(session.clone()).await.unwrap(); assert!(manager.has_session(&session).await); manager.remove_session(&session).await.unwrap(); @@ -104,9 +86,7 @@ mod execution_tests { let mgr = Arc::clone(&manager); let sess = session.clone(); handles.push(tokio::spawn(async move { - mgr.get_or_create_agent(sess, SessionExecutionMode::chat()) - .await - .unwrap() + mgr.get_or_create_agent(sess).await.unwrap() })); } @@ -130,14 +110,14 @@ mod execution_tests { // Create initial agent let agent1 = manager - .get_or_create_agent(session_id.clone(), SessionExecutionMode::chat()) + .get_or_create_agent(session_id.clone()) .await .unwrap(); // Get same session with different mode - should return same agent // (mode is stored but agent is reused) let agent2 = manager - .get_or_create_agent(session_id.clone(), SessionExecutionMode::Background) + .get_or_create_agent(session_id.clone()) .await .unwrap(); @@ -157,10 +137,7 @@ mod execution_tests { let sess = session_id.clone(); let mgr_clone = Arc::clone(&manager); handles.push(tokio::spawn(async move { - mgr_clone - .get_or_create_agent(sess, SessionExecutionMode::Interactive) - .await - .unwrap() + mgr_clone.get_or_create_agent(sess).await.unwrap() })); } @@ -188,19 +165,13 @@ mod execution_tests { let manager = AgentManager::new(Some(1)).await.unwrap(); let session1 = String::from("only-session"); - manager - .get_or_create_agent(session1.clone(), SessionExecutionMode::Interactive) - .await - .unwrap(); + manager.get_or_create_agent(session1.clone()).await.unwrap(); assert_eq!(manager.session_count().await, 1); // Creating second session should evict the first let session2 = String::from("new-session"); - manager - .get_or_create_agent(session2.clone(), SessionExecutionMode::Interactive) - .await - .unwrap(); + manager.get_or_create_agent(session2.clone()).await.unwrap(); assert!(!manager.has_session(&session1).await); assert!(manager.has_session(&session2).await); @@ -235,87 +206,4 @@ mod execution_tests { env::remove_var("GOOSE_DEFAULT_MODEL"); } } - - #[tokio::test] - async fn test_set_default_provider() { - use goose::providers::testprovider::TestProvider; - use std::sync::Arc; - - let manager = AgentManager::new(None).await.unwrap(); - - // Create a test provider for replaying (doesn't need inner provider) - let temp_file = format!( - "{}/test_provider_{}.json", - std::env::temp_dir().display(), - std::process::id() - ); - - // Create an empty test provider (will fail on actual use but that's ok for this test) - let test_provider = TestProvider::new_replaying(&temp_file) - .unwrap_or_else(|_| TestProvider::new_replaying("/tmp/dummy.json").unwrap()); - - manager.set_default_provider(Arc::new(test_provider)).await; - - let session = String::from("provider-test"); - let _agent = manager - .get_or_create_agent(session.clone(), SessionExecutionMode::Interactive) - .await - .unwrap(); - - assert!(manager.has_session(&session).await); - } - - #[tokio::test] - async fn test_eviction_updates_last_used() { - // Test that accessing a session updates its last_used timestamp - // and affects eviction order - let manager = AgentManager::new(Some(2)).await.unwrap(); - - let session1 = String::from("session-1"); - let session2 = String::from("session-2"); - - manager - .get_or_create_agent(session1.clone(), SessionExecutionMode::Interactive) - .await - .unwrap(); - - // Small delay to ensure different timestamps - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - - manager - .get_or_create_agent(session2.clone(), SessionExecutionMode::Interactive) - .await - .unwrap(); - - // Access session1 again to update its last_used - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - manager - .get_or_create_agent(session1.clone(), SessionExecutionMode::Interactive) - .await - .unwrap(); - - // Now create a third session - should evict session2 (least recently used) - let session3 = String::from("session-3"); - manager - .get_or_create_agent(session3.clone(), SessionExecutionMode::Interactive) - .await - .unwrap(); - - // session1 should still exist (recently accessed) - // session2 should be evicted (least recently used) - assert!(manager.has_session(&session1).await); - assert!(!manager.has_session(&session2).await); - assert!(manager.has_session(&session3).await); - } - - #[tokio::test] - async fn test_remove_nonexistent_session_error() { - // Test that removing a non-existent session returns an error - let manager = AgentManager::new(None).await.unwrap(); - let session = String::from("never-created"); - - let result = manager.remove_session(&session).await; - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("not found")); - } } From bd12068d243a2f6fb582484781f796c31c0cc48e Mon Sep 17 00:00:00 2001 From: Douwe Osinga Date: Tue, 30 Sep 2025 08:39:12 -0400 Subject: [PATCH 3/4] remove --- crates/goose/tests/execution_tests.rs | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/crates/goose/tests/execution_tests.rs b/crates/goose/tests/execution_tests.rs index 1084103baa4b..b94e39b1a2ee 100644 --- a/crates/goose/tests/execution_tests.rs +++ b/crates/goose/tests/execution_tests.rs @@ -103,27 +103,6 @@ mod execution_tests { assert_eq!(manager.session_count().await, 1); } - #[tokio::test] - async fn test_different_modes_same_session() { - let manager = AgentManager::new(None).await.unwrap(); - let session_id = String::from("mode-test"); - - // Create initial agent - let agent1 = manager - .get_or_create_agent(session_id.clone()) - .await - .unwrap(); - - // Get same session with different mode - should return same agent - // (mode is stored but agent is reused) - let agent2 = manager - .get_or_create_agent(session_id.clone()) - .await - .unwrap(); - - assert!(Arc::ptr_eq(&agent1, &agent2)); - } - #[tokio::test] async fn test_concurrent_session_creation_race_condition() { // Test that concurrent attempts to create the same new session ID From 21f0a49e36fe91f759646f022baa171dbf20ac1c Mon Sep 17 00:00:00 2001 From: Douwe Osinga Date: Tue, 30 Sep 2025 22:01:52 -0400 Subject: [PATCH 4/4] Cleaning up --- crates/goose-server/src/routes/reply.rs | 1 - crates/goose/src/execution/manager.rs | 3 - crates/goose/tests/execution_tests.rs | 184 +++++++++++------------- 3 files changed, 86 insertions(+), 102 deletions(-) diff --git a/crates/goose-server/src/routes/reply.rs b/crates/goose-server/src/routes/reply.rs index 20699ac92f05..3f2b983efbe1 100644 --- a/crates/goose-server/src/routes/reply.rs +++ b/crates/goose-server/src/routes/reply.rs @@ -10,7 +10,6 @@ use bytes::Bytes; use futures::{stream::StreamExt, Stream}; use goose::conversation::message::{Message, MessageContent}; use goose::conversation::Conversation; -use goose::execution::SessionExecutionMode; use goose::mcp_utils::ToolResult; use goose::permission::{Permission, PermissionConfirmation}; use goose::session::SessionManager; diff --git a/crates/goose/src/execution/manager.rs b/crates/goose/src/execution/manager.rs index 84d1a3bea73e..25eb8bdc4a6d 100644 --- a/crates/goose/src/execution/manager.rs +++ b/crates/goose/src/execution/manager.rs @@ -1,6 +1,3 @@ -//! Agent lifecycle management with session isolation - -use super::SessionExecutionMode; use crate::agents::Agent; use crate::config::APP_STRATEGY; use crate::model::ModelConfig; diff --git a/crates/goose/tests/execution_tests.rs b/crates/goose/tests/execution_tests.rs index 677a6476ae61..07e8932563b7 100644 --- a/crates/goose/tests/execution_tests.rs +++ b/crates/goose/tests/execution_tests.rs @@ -81,8 +81,6 @@ mod execution_tests { assert!(!manager.has_session(&session).await); assert!(manager.remove_session(&session).await.is_err()); - - AgentManager::reset_for_test(); } #[tokio::test] @@ -112,10 +110,10 @@ mod execution_tests { } assert_eq!(manager.session_count().await, 1); - - AgentManager::reset_for_test(); } + #[tokio::test] + #[serial] async fn test_concurrent_session_creation_race_condition() { // Test that concurrent attempts to create the same new session ID // result in only one agent being created (tests double-check pattern) @@ -140,129 +138,119 @@ mod execution_tests { .map(|r| r.unwrap()) .collect(); - // All should be the same agent (double-check pattern should prevent duplicates) for agent in &agents[1..] { assert!( Arc::ptr_eq(&agents[0], agent), "All concurrent requests should get the same agent" ); } - - // Only one session should exist assert_eq!(manager.session_count().await, 1); + } - #[tokio::test] - #[serial] - async fn test_configure_default_provider() { - use std::env; - - AgentManager::reset_for_test(); + #[tokio::test] + #[serial] + async fn test_configure_default_provider() { + use std::env; - let original_provider = env::var("GOOSE_DEFAULT_PROVIDER").ok(); - let original_model = env::var("GOOSE_DEFAULT_MODEL").ok(); + AgentManager::reset_for_test(); - env::set_var("GOOSE_DEFAULT_PROVIDER", "openai"); - env::set_var("GOOSE_DEFAULT_MODEL", "gpt-4o-mini"); + let original_provider = env::var("GOOSE_DEFAULT_PROVIDER").ok(); + let original_model = env::var("GOOSE_DEFAULT_MODEL").ok(); - let manager = AgentManager::instance().await.unwrap(); - let result = manager.configure_default_provider().await; + env::set_var("GOOSE_DEFAULT_PROVIDER", "openai"); + env::set_var("GOOSE_DEFAULT_MODEL", "gpt-4o-mini"); - assert!(result.is_ok()); + let manager = AgentManager::instance().await.unwrap(); + let result = manager.configure_default_provider().await; - // Restore original env vars - if let Some(val) = original_provider { - env::set_var("GOOSE_DEFAULT_PROVIDER", val); - } else { - env::remove_var("GOOSE_DEFAULT_PROVIDER"); - } - if let Some(val) = original_model { - env::set_var("GOOSE_DEFAULT_MODEL", val); - } else { - env::remove_var("GOOSE_DEFAULT_MODEL"); - } + assert!(result.is_ok()); - AgentManager::reset_for_test(); + // Restore original env vars + if let Some(val) = original_provider { + env::set_var("GOOSE_DEFAULT_PROVIDER", val); + } else { + env::remove_var("GOOSE_DEFAULT_PROVIDER"); } + if let Some(val) = original_model { + env::set_var("GOOSE_DEFAULT_MODEL", val); + } else { + env::remove_var("GOOSE_DEFAULT_MODEL"); + } + } - #[tokio::test] - #[serial] - async fn test_set_default_provider() { - use goose::providers::testprovider::TestProvider; - use std::sync::Arc; - - AgentManager::reset_for_test(); - let manager = AgentManager::instance().await.unwrap(); - - // Create a test provider for replaying (doesn't need inner provider) - let temp_file = format!( - "{}/test_provider_{}.json", - std::env::temp_dir().display(), - std::process::id() - ); + #[tokio::test] + #[serial] + async fn test_set_default_provider() { + use goose::providers::testprovider::TestProvider; + use std::sync::Arc; - // Create an empty test provider (will fail on actual use but that's ok for this test) - let test_provider = TestProvider::new_replaying(&temp_file) - .unwrap_or_else(|_| TestProvider::new_replaying("/tmp/dummy.json").unwrap()); + AgentManager::reset_for_test(); + let manager = AgentManager::instance().await.unwrap(); - manager.set_default_provider(Arc::new(test_provider)).await; + // Create a test provider for replaying (doesn't need inner provider) + let temp_file = format!( + "{}/test_provider_{}.json", + std::env::temp_dir().display(), + std::process::id() + ); - let session = String::from("provider-test"); - let _agent = manager.get_or_create_agent(session.clone()).await.unwrap(); + // Create an empty test provider (will fail on actual use but that's ok for this test) + let test_provider = TestProvider::new_replaying(&temp_file) + .unwrap_or_else(|_| TestProvider::new_replaying("/tmp/dummy.json").unwrap()); - assert!(manager.has_session(&session).await); + manager.set_default_provider(Arc::new(test_provider)).await; - AgentManager::reset_for_test(); - } + let session = String::from("provider-test"); + let _agent = manager.get_or_create_agent(session.clone()).await.unwrap(); - #[tokio::test] - #[serial] - async fn test_eviction_updates_last_used() { - AgentManager::reset_for_test(); - // Test that accessing a session updates its last_used timestamp - // and affects eviction order - let manager = AgentManager::instance().await.unwrap(); + assert!(manager.has_session(&session).await); + } - let sessions: Vec<_> = (0..100).map(|i| format!("session-{}", i)).collect(); + #[tokio::test] + #[serial] + async fn test_eviction_updates_last_used() { + AgentManager::reset_for_test(); + // Test that accessing a session updates its last_used timestamp + // and affects eviction order + let manager = AgentManager::instance().await.unwrap(); - for session in &sessions { - manager.get_or_create_agent(session.clone()).await.unwrap(); - // Small delay to ensure different timestamps - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - } + let sessions: Vec<_> = (0..100).map(|i| format!("session-{}", i)).collect(); - // Access the first session again to update its last_used + for session in &sessions { + manager.get_or_create_agent(session.clone()).await.unwrap(); + // Small delay to ensure different timestamps tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - manager - .get_or_create_agent(sessions[0].clone()) - .await - .unwrap(); - - // Now create a 101st session - should evict session2 (least recently used) - let session101 = String::from("session-101"); - manager - .get_or_create_agent(session101.clone()) - .await - .unwrap(); - - assert!(manager.has_session(&sessions[0]).await); - assert!(!manager.has_session(&sessions[1]).await); - assert!(manager.has_session(&session101).await); - AgentManager::reset_for_test(); } - #[tokio::test] - #[serial] - async fn test_remove_nonexistent_session_error() { - // Test that removing a non-existent session returns an error - AgentManager::reset_for_test(); - let manager = AgentManager::instance().await.unwrap(); - let session = String::from("never-created"); + // Access the first session again to update its last_used + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + manager + .get_or_create_agent(sessions[0].clone()) + .await + .unwrap(); + + // Now create a 101st session - should evict session2 (least recently used) + let session101 = String::from("session-101"); + manager + .get_or_create_agent(session101.clone()) + .await + .unwrap(); - let result = manager.remove_session(&session).await; - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("not found")); + assert!(manager.has_session(&sessions[0]).await); + assert!(!manager.has_session(&sessions[1]).await); + assert!(manager.has_session(&session101).await); + } - AgentManager::reset_for_test(); - } + #[tokio::test] + #[serial] + async fn test_remove_nonexistent_session_error() { + // Test that removing a non-existent session returns an error + AgentManager::reset_for_test(); + let manager = AgentManager::instance().await.unwrap(); + let session = String::from("never-created"); + + let result = manager.remove_session(&session).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("not found")); } }