Skip to content

Conversation

@Devasy
Copy link

@Devasy Devasy commented Dec 20, 2025

Changes Made

This fork adds comprehensive per-agent and per-task token tracking to CrewAI, providing detailed token usage metrics for each agent and task in a workflow.

Modified Files

  1. src/crewai/types/usage_metrics.py - Added new data models:

    • AgentTokenMetrics - Tracks tokens per agent (agent_name, total_tokens, prompt_tokens, completion_tokens, requests)
    • TaskTokenMetrics - Tracks tokens per task (task_name, agent_name, total_tokens, prompt_tokens, completion_tokens, requests)
    • WorkflowTokenMetrics - Aggregates all metrics with per_agent and per_task dictionaries
  2. src/crewai/crews/crew_output.py - Enhanced CrewOutput:

    • Added token_metrics: WorkflowTokenMetrics | None field for detailed per-agent and per-task breakdown
  3. src/crewai/tasks/task_output.py - Enhanced TaskOutput:

    • Added usage_metrics: TaskTokenMetrics | None field for per-task token usage
  4. src/crewai/crew.py - Core tracking implementation:

    • Modified calculate_usage_metrics() to build per-agent token breakdown
    • Added _get_agent_token_usage() helper to capture agent token state
    • Added _attach_task_token_metrics() to calculate and attach per-task tokens
    • Modified _execute_tasks() to capture tokens before/after each task execution
    • Added workflow_token_metrics field to Crew class
    • Modified _create_crew_output() to attach token_metrics to result

New Features

Per-Agent Token Tracking

Each agent's total token usage is tracked separately, showing:

  • Total tokens used across all tasks
  • Prompt tokens vs completion tokens breakdown
  • Number of successful LLM requests

Per-Task Token Tracking

Each task's token usage is tracked with:

  • Task name and ID
  • Agent that executed the task
  • Token breakdown (prompt, completion, total)
  • Attached directly to TaskOutput objects

Accurate Attribution

Uses delta calculation (tokens_after - tokens_before) to accurately attribute tokens to specific tasks, even when multiple tasks are performed by the same agent.

Usage Example

from crewai import Agent, Task, Crew

# Create crew and execute
crew = Crew(agents=[agent1, agent2], tasks=[task1, task2, task3])
result = crew.kickoff()

# Access per-agent metrics
for agent_name, metrics in result.token_metrics.per_agent.items():
    print(f"{agent_name}: {metrics.total_tokens} tokens")
    print(f"  Prompt: {metrics.prompt_tokens}, Completion: {metrics.completion_tokens}")

# Access per-task metrics
for task_output in result.tasks_output:
    if task_output.usage_metrics:
        print(f"Task: {task_output.usage_metrics.task_name}")
        print(f"  Tokens: {task_output.usage_metrics.total_tokens}")
        print(f"  Agent: {task_output.usage_metrics.agent_name}")

# Access via workflow metrics
for task_name, metrics in result.token_metrics.per_task.items():
    print(f"{task_name}: {metrics.total_tokens} tokens")

Test Results

All tests pass with 100% accuracy:

  • ✅ Per-agent token metrics correctly aggregate all tasks by agent
  • ✅ Per-task token metrics accurately attribute tokens to individual tasks
  • ✅ Token sums validate: sum(task_tokens) == agent_total == result.token_usage.total_tokens
  • ✅ Works with multiple tasks per agent
  • ✅ Works with multiple agents per workflow

Backward Compatibility

All changes are backward compatible:

  • Existing result.token_usage (crew-level totals) continues to work
  • New fields are optional (default None)
  • No breaking changes to existing APIs

Benefits

  1. Cost Tracking: Track token costs per agent and per task for accurate billing
  2. Performance Optimization: Identify which agents/tasks consume the most tokens
  3. Debugging: Trace token usage through complex workflows
  4. Monitoring: Build dashboards showing per-agent and per-task token consumption
  5. Transparency: Understand exactly where tokens are being used

Note

Introduces detailed token accounting across the workflow with deltas captured per task and aggregated per agent.

  • Adds AgentTokenMetrics, TaskTokenMetrics, and WorkflowTokenMetrics in types/usage_metrics.py
  • Extends TaskOutput with usage_metrics and CrewOutput with token_metrics
  • Captures token usage before/after task execution (both async and thread-based) via _get_agent_token_usage and _attach_task_token_metrics, storing per-task metrics in workflow_token_metrics
  • Updates async/sync task execution paths in crew.py to wrap tasks, collect tokens_after, and attach metrics; adjusts async queues (pending_tasks/futures) to carry agent and token snapshots
  • Enhances calculate_usage_metrics() to build per_agent breakdown (and set workflow totals) from per-task data and manager metrics

Written by Cursor Bugbot for commit f62a5a9. This will update automatically on new commits. Configure here.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is being reviewed by Cursor Bugbot

Details

Your team is on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle for each member of your team.

To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

Devasy and others added 3 commits January 3, 2026 22:24
Resolved 4 review comments from Cursor Bugbot:
1. Added token tracking for async tasks in _execute_tasks and _process_async_tasks
2. Fixed task key collision by including task_id in the key
3. Added token tracking for _aexecute_tasks paths (both sync and async)
4. Fixed agent metrics to be keyed by agent_id to handle multiple agents with same role

All async tasks now capture tokens_before/after and attach metrics properly.
Task metrics now use unique keys to prevent overwriting.
Agent metrics properly track separate agents with same role.
@Devasy
Copy link
Author

Devasy commented Jan 3, 2026

Review Comments Resolved ✅

I've addressed all 4 review comments from Cursor Bugbot in commit afea8a50:

1. ✅ Async tasks missing per-task token tracking

Fixed: Added token tracking for async tasks in _execute_tasks and _process_async_tasks. Now capturing tokens_before when async_execution is True and attaching metrics after task completion.

2. ✅ Task key collision causing metrics overwriting

Fixed: Updated task key to include task_id: f"{task_tokens.task_id}_{task_tokens.task_name}_{task_tokens.agent_name}" to prevent collision when multiple tasks have the same name.

3. ✅ Async kickoff path missing all per-task token tracking

Fixed: Added per-task token tracking to _aexecute_tasks method for both sync and async task paths, plus updated _aprocess_async_tasks to capture and attach metrics.

4. ✅ Multiple agents with same role get combined metrics

Fixed: Updated calculate_usage_metrics() to key agent metrics by agent_id instead of agent_name/role. This properly tracks separate agents that share the same role.

All changes maintain backward compatibility and follow the same pattern used for synchronous task execution.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is being reviewed by Cursor Bugbot

Details

Your team is on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle for each member of your team.

To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

Resolved race condition where concurrent async tasks from same agent
would get incorrect token attribution. Solution wraps async task execution
to capture tokens_after immediately upon task completion, before other
concurrent tasks can interfere.

Changes:
- Wrapped async task execution to return (result, tokens_after) tuple
- Updated _aprocess_async_tasks to unwrap and use captured tokens_after
- Updated type hints for pending_tasks to reflect new signature

Note: Threading-based async_execution still has similar race condition
as it's harder to wrap threaded execution. Will track separately.
@Devasy
Copy link
Author

Devasy commented Jan 3, 2026

@joaomdmoura can you please look into the PR?

Capture task, exec_data, and context via default arguments to avoid
Python's late-binding closure behavior. Without this fix, when multiple
async tasks are created back-to-back, they would all reference values
from the last loop iteration, causing wrong tasks to be executed with
wrong agents and incorrect token attribution.
…itation

1. Fixed manager agent using manager_role as key instead of manager_id.
   Now all agents (regular and manager) are keyed by agent_id in
   workflow_metrics.per_agent for consistency.

2. Added documentation for the threading-based async task race condition
   in _process_async_tasks. This is a known limitation tracked by issue
   crewAIInc#4168. Users should use akickoff() for accurate async task token tracking.
@Devasy
Copy link
Author

Devasy commented Jan 3, 2026

2 Additional Review Comments Resolved ✅

1. Sync async tasks have unfixed race condition for tokens

Status: Documented (known limitation tracked by issue #4168)

Added documentation to _process_async_tasks explaining the race condition:

  • ThreadPoolExecutor-based async tasks capture tokens_after outside the task execution context
  • This causes inaccurate token attribution when concurrent tasks share the same agent
  • Recommendation: Use akickoff() for more accurate async task token tracking
  • This is a hard limitation of the threading model and can't be easily fixed without changing the Task API

2. Inconsistent per-agent dictionary keys cause lookup issues

Fixed in commit 40f06925:

Changed manager agent keying from manager_role to manager_id to be consistent with regular agents:

# Before (inconsistent)
workflow_metrics.per_agent[manager_role] = manager_metrics  # keyed by role
workflow_metrics.per_agent[agent_id] = agent_metrics        # keyed by id

# After (consistent)
workflow_metrics.per_agent[manager_id] = manager_metrics    # keyed by id
workflow_metrics.per_agent[agent_id] = agent_metrics        # keyed by id

Now all agents (regular and manager) are consistently keyed by agent_id in workflow_metrics.per_agent.

Instead of calling task.execute_async() and capturing tokens_after
outside the thread, we now:
1. Create a wrapper function that executes task.execute_sync() in thread
2. Capture tokens_after immediately after completion WITHIN the thread
3. Return (result, tokens_after) tuple from the thread
4. Unwrap and use captured tokens_after in _process_async_tasks

This is the same approach used for asyncio tasks and properly avoids
race conditions when concurrent tasks from the same agent run in parallel.

Also uses default arguments to avoid late-binding closure issues.
@Devasy
Copy link
Author

Devasy commented Jan 3, 2026

Threading Race Condition Properly Fixed ✅

Issue: Sync async tasks have unfixed race condition for tokens

Fixed in commit f62a5a9f:

Instead of just documenting the limitation, I implemented a proper fix using the same approach as the asyncio version:

Solution

  1. Wrapped task execution: Instead of calling task.execute_async(), we now create a custom wrapper that:

    • Executes task.execute_sync() in a thread
    • Captures tokens_after immediately after completion within the thread
    • Returns (result, tokens_after) tuple
  2. Unwrap in processing: _process_async_tasks now unwraps the tuple and uses the captured tokens_after

  3. Late-binding protection: Uses default arguments to capture variables at definition time

# Before (race condition)
future = task.execute_async(...)
# ... later, outside thread ...
tokens_after = self._get_agent_token_usage(agent)  # WRONG - other tasks may have completed

# After (fixed)
def _wrapped_sync_task_execution(...):
    result = _task.execute_sync(...)
    tokens_after = _self._get_agent_token_usage(_exec_data.agent)  # Captured IN thread
    return result, tokens_after

This ensures tokens are captured at the exact moment each task completes, preventing interference from other concurrent tasks.

result = _wrapped_sync_task_execution()
future.set_result(result)
except Exception as e:
future.set_exception(e)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Late-binding closure in _run_in_thread causes wrong task execution

The _run_in_thread function captures _wrapped_sync_task_execution and future via closure reference, not via default arguments. When multiple async tasks are created in a loop, each thread may execute with values from a later iteration due to Python's late-binding closure behavior. This causes threads to call the wrong task execution function and set results on the wrong Future object, leading to incorrect task execution, wrong results, or deadlocks when waiting for futures that never receive their results.

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant