-
Notifications
You must be signed in to change notification settings - Fork 3k
fix: cleanup resources properly on BaseSession::_receive_loop cleanup
#1817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix: cleanup resources properly on BaseSession::_receive_loop cleanup
#1817
Conversation
Session must close _response_streams properly when exiting async with scope. Session AS-IS fails the test since the cleanup logic is being cancelled forcefully.
A cleanup logic should not be cancelled via CancelScope. Therefore 'shield=True' parameter is required.
Since we are using for-loop while cleaning up _response_streams dict, the dict must not be changed on cleanup. We could handle this issue easily by adding a simple flag.
…ager Merge #4025 **Please ensure you have read the [contribution guide](https://github.com/google/adk-python/blob/main/CONTRIBUTING.md) before creating a pull request.** ### Link to Issue or Description of Change **1. Link to an existing issue (if applicable):** - Closes: - #3950 - #3731 - #3708 **2. Or, if no issue exists, describe the change:** **Problem:** - `ClientSession` of https://github.com/modelcontextprotocol/python-sdk uses AnyIO for async task management. - AnyIO TaskGroup requires its start and close must happen in a same task. - Since `McpSessionManager` does not create task per client, the client might be closed by different task, cause the error: `Attempted to exit cancel scope in a different task than it was entered in`. **Solution:** I Suggest 2 changes: Handling the `ClientSession` in a single task - To start and close `ClientSession` by the same task, we need to wrap the whole lifecycle of `ClientSession` to a single task. - `SessionContext` wraps the initialization and disposal of `ClientSession` to a single task, ensures that the `ClientSession` will be handled only in a dedicated task. Add timeout for `ClientSession` - Since now we are using task per `ClientSession`, task should never be leaked. - But `McpSessionManager` does not deliver timeout directly to `ClientSession` when the type is not STDIO. - There is only timeout for `httpx` client when MCP type is SSE or StreamableHTTP. - But the timeout applys only to `httpx` client, so if there is an issue in MCP client itself(e.g. modelcontextprotocol/python-sdk#262), a tool call waits the result **FOREVER**! - To overcome this issue, I propagated the `sse_read_timeout` to `ClientSession`. - `timeout` is too short for timeout for tool call, since its default value is only 5s. - `sse_read_timeout` is originally made for read timeout of SSE(default value of 5m or 300s), but actually most of SSE implementations from server (e.g. FastAPI, etc.) sends ping periodically(about 15s I assume), so in a normal circumstances this timeout is quite useless. - If the server does not send ping, the timeout is equal to tool call timeout. Therefore, it would be appropriate to use `sse_read_timeout` as tool call timeout. - Most of tool calls should finish within 5 minutes, and sse timeout is adjustable if not. - If this change is not acceptable, we could make a dedicate parameter for tool call timeout(e.g. `tool_call_timeout`). ### Testing Plan - Although this does not change the interface itself, it changes its own session management logics, some existing tests are no longer valid. - I made changes to those tests, especially those of which validate session states(e.g. checking whether `initialize()` called). - Since now session is encapsulated with `SessionContext`, we cannot validate the initialized state of the session in `TestMcpSessionManager`, should validate it at `TestSessionContext`. - Added a simple test for reproducing the issue(`test_create_and_close_session_in_different_tasks`). - Also made a test for the new component: `SessionContext`. **Unit Tests:** - [x] I have added or updated unit tests for my change. - [x] All unit tests pass locally. ```plaintext =================================================================================== 3689 passed, 1 skipped, 2205 warnings in 63.39s (0:01:03) =================================================================================== ``` **Manual End-to-End (E2E) Tests:** _Please provide instructions on how to manually test your changes, including any necessary setup or configuration. Please provide logs or screenshots to help reviewers better understand the fix._ ### Checklist - [x] I have read the [CONTRIBUTING.md](https://github.com/google/adk-python/blob/main/CONTRIBUTING.md) document. - [x] I have performed a self-review of my own code. - [x] I have commented my code, particularly in hard-to-understand areas. - [x] I have added tests that prove my fix is effective or that my feature works. - [x] New and existing unit tests pass locally with my changes. - [x] I have manually tested my changes end-to-end. - [ ] ~~Any dependent changes have been merged and published in downstream modules.~~ `no deps has been changed` ### Additional context This PR is related to modelcontextprotocol/python-sdk#1817 since it also fixes endless tool call awaiting. Co-authored-by: Kathy Wu <wukathy@google.com> COPYBARA_INTEGRATE_REVIEW=#4025 from challenger71498:feat/task-based-mcp-session-manager f7f7cd0 PiperOrigin-RevId: 856438147
|
Hi! We're experiencing the same The error occurs in Your fix with This is a blocking issue for us — we had to disable HTTP-based MCP servers entirely. Any timeline on when this might be reviewed/merged? Happy to help test or contribute if needed. |
Apply CancelScope(shield=True) to prevent RuntimeError when cleaning up HTTP MCP clients. This fixes the 'Attempted to exit cancel scope in a different task' error that occurs with asyncio. Changes: - session.py: Add _closing flag, shield cleanup in _receive_loop - streamable_http.py: Shield terminate_session and stream cleanup Related to: modelcontextprotocol#577 Based on: modelcontextprotocol#1817
Apply CancelScope(shield=True) to prevent RuntimeError when cleaning up HTTP MCP clients. This fixes the 'Attempted to exit cancel scope in a different task' error that occurs with asyncio. Changes: - session.py: Shield cleanup in _receive_loop finally block - streamable_http.py: Shield terminate_session and stream cleanup Based on v1.26.0 to maintain compatibility with openai-agents SDK. Related to: modelcontextprotocol#577 Based on: modelcontextprotocol#1817
This PR fixes Session cleanup is being failed, caused by task cancellation via
__aexit__not handled properly.Motivation and Context
Session cleanups the receive streams via
finallyclause ofBaseSession::_receive_loop.__aexit__), it cancels the TaskGroup.finallyclause, such asMemoryObjectStream::sendis its base coroutine_receive_loop, IS the TaskGroup which is being closed.I fixed this problem by adding
CancelScopewithshield=Trueonfinallyclause, ensures the cleanup logic to be executed properly even TaskGroup cancel is requested.Also, while cleaning up receive streams, we loop streams, therefore stream dictionary must not change.
BaseSessionAS-IS does not protect dictionary change on cleanup.How Has This Been Tested?
I found this issue while debugging the problem, using ADK AgentTool with MCP causes an abtruse error:
After a week of debugging, I realised the ultimate fix to this problem is quite huge and need fixes both MCP SDK and Google ADK, and this fix is one of them.
I added a simple test which triggers this issue.
Breaking Changes
finallyblock until every stream is being closed, the__aexit__will be blocked untilfinallyblock finishes its execution.finallyblock is being executed too long, user might think it is hanging.Types of changes
Checklist
No docs needed to be updatedAdditional context
This PR MAY resolve issues below:
read_stream_writeropen after SSE disconnection hanging.receive()#1811I think this problem is related to task leak (or indefinite-waiting) at Google ADK, since in ADK the tool call await forever until the receive stream responds.