diff --git a/dotnet/src/Client.cs b/dotnet/src/Client.cs index 74f1c66f..d29698dc 100644 --- a/dotnet/src/Client.cs +++ b/dotnet/src/Client.cs @@ -10,6 +10,7 @@ using System.Data; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using System.Linq; using System.Net.Sockets; using System.Text.Json; using System.Text.Json.Serialization; @@ -210,21 +211,34 @@ async Task StartCoreAsync(CancellationToken ct) /// public async Task StopAsync() { - var errors = new List(); + var sessionsToDestroy = _sessions.Values.ToArray(); + _sessions.Clear(); - foreach (var session in _sessions.Values.ToArray()) + var sessionTasks = sessionsToDestroy.Select(async session => { - try - { - await session.DisposeAsync(); - } - catch (Exception ex) + Exception? lastEx = null; + for (int attempt = 1; attempt <= 3; attempt++) { - errors.Add(new Exception($"Failed to destroy session {session.SessionId}: {ex.Message}", ex)); + try + { + await session.DisposeAsync(); + return null; + } + catch (Exception ex) + { + lastEx = ex; + if (attempt < 3) + { + // Exponential backoff: 100ms, 200ms + await Task.Delay(100 * (1 << (attempt - 1))); + } + } } - } + return new Exception($"Failed to destroy session {session.SessionId} after 3 attempts: {lastEx?.Message}", lastEx); + }); - _sessions.Clear(); + var results = await Task.WhenAll(sessionTasks); + var errors = results.Where(e => e != null).Cast().ToList(); await CleanupConnectionAsync(errors); _connectionTask = null; diff --git a/go/client.go b/go/client.go index 319c6588..7fcfe772 100644 --- a/go/client.go +++ b/go/client.go @@ -294,10 +294,34 @@ func (c *Client) Stop() error { } c.sessionsMux.Unlock() + var wg sync.WaitGroup + errChan := make(chan error, len(sessions)) + for _, session := range sessions { - if err := session.Destroy(); err != nil { - errs = append(errs, fmt.Errorf("failed to destroy session %s: %w", session.SessionID, err)) - } + wg.Add(1) + go func(s *Session) { + defer wg.Done() + var lastErr error + for attempt := 1; attempt <= 3; attempt++ { + if err := s.Destroy(); err != nil { + lastErr = err + if attempt < 3 { + // Exponential backoff: 100ms, 200ms + time.Sleep(time.Duration(100*(1<<(attempt-1))) * time.Millisecond) + } + } else { + return + } + } + errChan <- fmt.Errorf("failed to destroy session %s after 3 attempts: %w", s.SessionID, lastErr) + }(session) + } + + wg.Wait() + close(errChan) + + for err := range errChan { + errs = append(errs, err) } c.sessionsMux.Lock() diff --git a/python/copilot/client.py b/python/copilot/client.py index 85b72897..7e90e9fb 100644 --- a/python/copilot/client.py +++ b/python/copilot/client.py @@ -313,13 +313,30 @@ async def stop(self) -> list["StopError"]: sessions_to_destroy = list(self._sessions.values()) self._sessions.clear() - for session in sessions_to_destroy: - try: - await session.destroy() - except Exception as e: - errors.append( - StopError(message=f"Failed to destroy session {session.session_id}: {e}") + async def destroy_with_retry(session: CopilotSession) -> Optional[StopError]: + """Destroy a session with up to 3 attempts and exponential backoff.""" + last_err: Optional[Exception] = None + for attempt in range(1, 4): + try: + await session.destroy() + return None + except Exception as e: + last_err = e + if attempt < 3: + # Exponential backoff: 100ms, 200ms + delay = 0.1 * (2 ** (attempt - 1)) + await asyncio.sleep(delay) + + return StopError( + message=( + f"Failed to destroy session {session.session_id} after 3 attempts: {last_err}" ) + ) + + # Destroy all active sessions in parallel with retry logic + if sessions_to_destroy: + results = await asyncio.gather(*(destroy_with_retry(s) for s in sessions_to_destroy)) + errors.extend([r for r in results if r is not None]) # Close client if self._client: diff --git a/python/e2e/test_session.py b/python/e2e/test_session.py index f2e545ed..96304739 100644 --- a/python/e2e/test_session.py +++ b/python/e2e/test_session.py @@ -198,7 +198,8 @@ async def test_should_list_sessions(self, ctx: E2ETestContext): await session2.send_and_wait({"prompt": "Say goodbye"}) # Small delay to ensure session files are written to disk - await asyncio.sleep(0.2) + # Increased to 0.5s to avoid flakiness on slower environments (e.g. Windows CI) + await asyncio.sleep(0.5) # List sessions and verify they're included sessions = await ctx.client.list_sessions() @@ -229,7 +230,8 @@ async def test_should_delete_session(self, ctx: E2ETestContext): session_id = session.session_id # Small delay to ensure session file is written to disk - await asyncio.sleep(0.2) + # Increased to 0.5s to avoid flakiness on slower environments (e.g. Windows CI) + await asyncio.sleep(0.5) # Verify session exists in the list sessions = await ctx.client.list_sessions()