-
Notifications
You must be signed in to change notification settings - Fork 5.7k
Fix token tracking race condition in threading-based async execution #4169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix token tracking race condition in threading-based async execution #4169
Conversation
Resolved 4 review comments from Cursor Bugbot: 1. Added token tracking for async tasks in _execute_tasks and _process_async_tasks 2. Fixed task key collision by including task_id in the key 3. Added token tracking for _aexecute_tasks paths (both sync and async) 4. Fixed agent metrics to be keyed by agent_id to handle multiple agents with same role All async tasks now capture tokens_before/after and attach metrics properly. Task metrics now use unique keys to prevent overwriting. Agent metrics properly track separate agents with same role.
…sy23/crewAI-telemetry into feat/per-user-token-tracing
Resolved race condition where concurrent async tasks from same agent would get incorrect token attribution. Solution wraps async task execution to capture tokens_after immediately upon task completion, before other concurrent tasks can interfere. Changes: - Wrapped async task execution to return (result, tokens_after) tuple - Updated _aprocess_async_tasks to unwrap and use captured tokens_after - Updated type hints for pending_tasks to reflect new signature Note: Threading-based async_execution still has similar race condition as it's harder to wrap threaded execution. Will track separately.
This commit fixes the race condition described in issue #4168 where token tracking was inaccurate when multiple async tasks from the same agent ran concurrently. The fix introduces: 1. Per-agent locks to serialize async task execution for accurate token tracking when multiple async tasks from the same agent run concurrently 2. Token capture callback that captures both tokens_before and tokens_after inside the thread (after acquiring the lock), not when the task is queued 3. Updated _process_async_tasks to handle the new return type from execute_async which now returns (TaskOutput, tokens_before, tokens_after) This ensures that token deltas are accurately attributed to each task even when multiple async tasks from the same agent overlap in execution. Tests added: - test_async_task_token_tracking_uses_per_agent_lock - test_async_task_token_callback_captures_tokens_inside_thread - test_async_task_per_agent_lock_serializes_execution Co-Authored-By: João <joao@crewai.com>
|
You have run out of free Bugbot PR reviews for this billing cycle. This will reset on January 28. To receive reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial. |
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
Co-Authored-By: João <joao@crewai.com>
Co-Authored-By: João <joao@crewai.com>
Co-Authored-By: João <joao@crewai.com>
Summary
This PR fixes the race condition described in issue #4168 where token tracking was inaccurate when multiple async tasks with
async_execution=Truefrom the same agent ran concurrently.The root cause was that
tokens_beforewas captured when tasks were queued (in the main thread), buttokens_afterwas captured sequentially in_process_async_tasksafter callingfuture.result(). This caused later tasks to be credited with tokens from earlier tasks that ran in parallel.The fix introduces:
tokens_beforeandtokens_afterinside the worker thread (after acquiring the lock), not when the task is queuedAgentTokenMetrics,TaskTokenMetrics,WorkflowTokenMetrics) for detailed per-task token trackingTaskOutputto include per-taskusage_metricsUpdates since last revision
_aexecute_tasksusing default argumentscrew.py,task.py, andusage_metrics.pycalculate_usage_metricsby using.values()instead of.items()Review & Testing Checklist for Human
execute_asyncmethod signature changed to accept optionaltoken_capture_callbackandagent_execution_lockparameters. The return type also changed to potentially return a tuple. Verify this doesn't break existing code that callsexecute_asyncdirectly._execute_tasks(line ~1194):def create_token_callback(agent: Any = exec_data.agent)- ensure the default argument correctly captures the agent value in the loop_aexecute_tasks(line ~964):async def _wrapped_task_execution(_task=task, _agent=agent, ...)- verify default args correctly bind loop variablesfeat/per-user-token-tracingfrom PR Feature: CrewAI Token Tracking Enhancement #4132. Verify this is the intended target branch.Recommended test plan:
crew.kickoff()and inspectresult.tasks_output[i].usage_metricsfor each taskNotes
calculate_usage_metricswhere agents with the same role cannot be distinguished (see TODO comment)Link to Devin run: https://app.devin.ai/sessions/80968e6ecc774e45ad35f833cf8d2ea0
Requested by: João (joao@crewai.com)