-
Notifications
You must be signed in to change notification settings - Fork 0
Feat/per user token tracing #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughAdds per-task and per-agent token usage metrics to Crews; implements full human-in-the-loop (HITL) for Flows (decorator, async providers, pending/resume, persistence); refactors console event rendering to stateless panels; introduces an experimental flow-based agent executor; and adds extensive tests and documentation for HITL and metrics. Changes
Pre-merge checks❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
lib/crewai/src/crewai/crew.py (1)
970-976: Missing per-task token metrics in async execution path.Token usage tracking is implemented in
_execute_tasks(sync) but not in_aexecute_tasks. Async task execution will not capture per-task token metrics, leading to incompleteworkflow_token_metricsfor async workflows.🔎 Proposed fix
Apply similar token tracking pattern in
_aexecute_tasks:else: if pending_tasks: task_outputs = await self._aprocess_async_tasks( pending_tasks, was_replayed ) pending_tasks.clear() + # Capture token usage before task execution + tokens_before = self._get_agent_token_usage(exec_data.agent) + context = self._get_context(task, task_outputs) task_output = await task.aexecute_sync( agent=exec_data.agent, context=context, tools=exec_data.tools, ) + + # Capture token usage after task execution and attach to task output + tokens_after = self._get_agent_token_usage(exec_data.agent) + task_output = self._attach_task_token_metrics( + task_output, task, exec_data.agent, tokens_before, tokens_after + ) + task_outputs.append(task_output)Also consider async tasks in
_aprocess_async_tasks- they may need token tracking too, though capturing deltas for concurrent tasks is more complex.
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (4)
lib/crewai/src/crewai/crew.pylib/crewai/src/crewai/crews/crew_output.pylib/crewai/src/crewai/tasks/task_output.pylib/crewai/src/crewai/types/usage_metrics.py
🧰 Additional context used
🧬 Code graph analysis (3)
lib/crewai/src/crewai/tasks/task_output.py (1)
lib/crewai/src/crewai/types/usage_metrics.py (1)
TaskTokenMetrics(70-90)
lib/crewai/src/crewai/crews/crew_output.py (1)
lib/crewai/src/crewai/types/usage_metrics.py (2)
UsageMetrics(11-46)WorkflowTokenMetrics(93-117)
lib/crewai/src/crewai/crew.py (2)
lib/crewai/src/crewai/types/usage_metrics.py (3)
AgentTokenMetrics(49-67)WorkflowTokenMetrics(93-117)TaskTokenMetrics(70-90)lib/crewai/src/crewai/tasks/task_output.py (1)
TaskOutput(13-106)
🔇 Additional comments (4)
lib/crewai/src/crewai/crews/crew_output.py (1)
10-10: LGTM!The import and new
token_metricsfield are well-integrated. The optional typing withNonedefault ensures backward compatibility, and the field description clearly conveys its purpose.Also applies to: 29-32
lib/crewai/src/crewai/tasks/task_output.py (1)
9-9: LGTM!The
usage_metricsfield addition is clean with proper typing, documentation in the docstring, and backward-compatible defaults.Also applies to: 26-26, 47-50
lib/crewai/src/crewai/crew.py (2)
1162-1180: LGTM!The token tracking implementation for synchronous task execution correctly captures before/after metrics and calculates deltas. Clean integration with the existing task execution flow.
2031-2041: LGTM!Clean helper method with proper null-safety and fallback handling for different LLM implementations.
| workflow_token_metrics: Any | None = Field( | ||
| default=None, | ||
| description="Detailed per-agent and per-task token metrics.", | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Use explicit type annotation instead of Any.
The field workflow_token_metrics should be typed as WorkflowTokenMetrics | None for better type safety and IDE support, consistent with other similar fields in the codebase.
🔎 Proposed fix
Add the import at the top of the file:
from crewai.types.usage_metrics import UsageMetrics, WorkflowTokenMetricsThen update the field:
- workflow_token_metrics: Any | None = Field(
+ workflow_token_metrics: WorkflowTokenMetrics | None = Field(
default=None,
description="Detailed per-agent and per-task token metrics.",
)🤖 Prompt for AI Agents
In lib/crewai/src/crewai/crew.py around lines 206 to 209, the field
workflow_token_metrics is typed as Any which reduces type safety; replace the
Any annotation with WorkflowTokenMetrics | None and update the top-of-file
imports to include WorkflowTokenMetrics (and UsageMetrics if not already
imported) from crewai.types.usage_metrics so the field reads with the explicit
union type and IDE/type-checkers can validate it.
| json_dict=final_task_output.json_dict, | ||
| tasks_output=task_outputs, | ||
| token_usage=self.token_usage, | ||
| token_metrics=getattr(self, 'workflow_token_metrics', None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider direct attribute access instead of getattr.
Since workflow_token_metrics is a defined class field with a default value, you can access it directly as self.workflow_token_metrics instead of using getattr.
🔎 Proposed fix
- token_metrics=getattr(self, 'workflow_token_metrics', None),
+ token_metrics=self.workflow_token_metrics,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| token_metrics=getattr(self, 'workflow_token_metrics', None), | |
| token_metrics=self.workflow_token_metrics, |
🤖 Prompt for AI Agents
In lib/crewai/src/crewai/crew.py around line 1418, the call uses getattr(self,
'workflow_token_metrics', None) although workflow_token_metrics is a defined
class attribute with a default; replace the getattr usage with direct attribute
access self.workflow_token_metrics to simplify and clarify the code, ensuring
any existing default remains defined on the class or instance.
* feat: introduce human feedback events and decorator for flow methods - Added HumanFeedbackRequestedEvent and HumanFeedbackReceivedEvent classes to handle human feedback interactions within flows. - Implemented the @human_feedback decorator to facilitate human-in-the-loop workflows, allowing for feedback collection and routing based on responses. - Enhanced Flow class to store human feedback history and manage feedback outcomes. - Updated flow wrappers to preserve attributes from methods decorated with @human_feedback. - Added integration and unit tests for the new human feedback functionality, ensuring proper validation and routing behavior. * adding deployment docs * New docs * fix printer * wrong change * Adding Async Support feat: enhance human feedback support in flows - Updated the @human_feedback decorator to use 'message' parameter instead of 'request' for clarity. - Introduced new FlowPausedEvent and MethodExecutionPausedEvent to handle flow and method pauses during human feedback. - Added ConsoleProvider for synchronous feedback collection and integrated async feedback capabilities. - Implemented SQLite persistence for managing pending feedback context. - Expanded documentation to include examples of async human feedback usage and best practices. * linter * fix * migrating off printer * updating docs * new tests * doc update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
lib/crewai/src/crewai/translations/en.json (1)
37-37: Standardize capitalization in the error message.The message uses "best final answer" at the start but "BEST Final answer" at the end. For consistency with the related
force_final_answer_errormessage on line 36 (which uses "best final answer" in lowercase), standardize to lowercase throughout:- "force_final_answer": "Now it's time you MUST give your absolute best final answer. You'll ignore all previous instructions, stop using any tools, and just return your absolute BEST Final answer.", + "force_final_answer": "Now it's time you MUST give your absolute best final answer. You'll ignore all previous instructions, stop using any tools, and just return your absolute best final answer.",
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (24)
docs/docs.jsondocs/en/concepts/flows.mdxdocs/en/learn/human-feedback-in-flows.mdxdocs/en/learn/human-in-the-loop.mdxdocs/ko/concepts/flows.mdxdocs/ko/learn/human-feedback-in-flows.mdxdocs/pt-BR/concepts/flows.mdxdocs/pt-BR/learn/human-feedback-in-flows.mdxlib/crewai/src/crewai/events/event_listener.pylib/crewai/src/crewai/events/types/flow_events.pylib/crewai/src/crewai/events/utils/console_formatter.pylib/crewai/src/crewai/flow/__init__.pylib/crewai/src/crewai/flow/async_feedback/__init__.pylib/crewai/src/crewai/flow/async_feedback/providers.pylib/crewai/src/crewai/flow/async_feedback/types.pylib/crewai/src/crewai/flow/flow.pylib/crewai/src/crewai/flow/flow_wrappers.pylib/crewai/src/crewai/flow/human_feedback.pylib/crewai/src/crewai/flow/persistence/base.pylib/crewai/src/crewai/flow/persistence/sqlite.pylib/crewai/src/crewai/translations/en.jsonlib/crewai/tests/test_async_human_feedback.pylib/crewai/tests/test_human_feedback_decorator.pylib/crewai/tests/test_human_feedback_integration.py
🧰 Additional context used
🧬 Code graph analysis (10)
lib/crewai/src/crewai/events/utils/console_formatter.py (1)
lib/crewai/src/crewai/flow/flow.py (1)
flow_id(973-1003)
lib/crewai/tests/test_human_feedback_integration.py (3)
lib/crewai/src/crewai/flow/flow.py (7)
Flow(479-2178)listen(186-249)start(111-183)FlowState(96-102)kickoff(1168-1213)kickoff_async(1215-1422)state(964-965)lib/crewai/src/crewai/flow/human_feedback.py (2)
HumanFeedbackResult(75-111)human_feedback(155-400)lib/crewai/src/crewai/project/annotations.py (1)
llm(89-98)
lib/crewai/src/crewai/flow/async_feedback/__init__.py (2)
lib/crewai/src/crewai/flow/async_feedback/types.py (3)
HumanFeedbackPending(114-184)HumanFeedbackProvider(188-264)PendingFeedbackContext(18-111)lib/crewai/src/crewai/flow/async_feedback/providers.py (1)
ConsoleProvider(17-124)
lib/crewai/src/crewai/flow/persistence/base.py (2)
lib/crewai/src/crewai/flow/async_feedback/types.py (1)
PendingFeedbackContext(18-111)lib/crewai/src/crewai/flow/persistence/sqlite.py (4)
save_pending_feedback(182-232)save_state(115-154)load_pending_feedback(234-266)clear_pending_feedback(268-281)
lib/crewai/src/crewai/events/event_listener.py (4)
lib/crewai/src/crewai/events/types/flow_events.py (2)
FlowPausedEvent(93-113)MethodExecutionPausedEvent(61-81)lib/crewai/src/crewai/events/event_bus.py (1)
on(147-191)lib/crewai/src/crewai/events/utils/console_formatter.py (2)
update_method_status(499-560)update_flow_status(445-497)lib/crewai/src/crewai/flow/flow.py (1)
flow_id(973-1003)
lib/crewai/src/crewai/flow/__init__.py (3)
lib/crewai/src/crewai/flow/async_feedback/providers.py (1)
ConsoleProvider(17-124)lib/crewai/src/crewai/flow/async_feedback/types.py (3)
HumanFeedbackPending(114-184)HumanFeedbackProvider(188-264)PendingFeedbackContext(18-111)lib/crewai/src/crewai/flow/human_feedback.py (2)
human_feedback(155-400)HumanFeedbackResult(75-111)
lib/crewai/src/crewai/flow/async_feedback/providers.py (3)
lib/crewai/src/crewai/flow/async_feedback/types.py (1)
PendingFeedbackContext(18-111)lib/crewai/src/crewai/events/types/flow_events.py (2)
HumanFeedbackReceivedEvent(144-160)HumanFeedbackRequestedEvent(123-141)lib/crewai/src/crewai/events/utils/console_formatter.py (2)
pause_live_updates(178-184)resume_live_updates(186-189)
lib/crewai/tests/test_human_feedback_decorator.py (1)
lib/crewai/src/crewai/flow/human_feedback.py (3)
human_feedback(155-400)HumanFeedbackConfig(115-135)HumanFeedbackResult(75-111)
lib/crewai/src/crewai/flow/flow.py (4)
lib/crewai/src/crewai/events/types/flow_events.py (6)
FlowPausedEvent(93-113)MethodExecutionFailedEvent(50-58)MethodExecutionFinishedEvent(40-47)MethodExecutionPausedEvent(61-81)FlowFinishedEvent(84-90)HumanFeedbackReceivedEvent(144-160)lib/crewai/src/crewai/flow/async_feedback/types.py (2)
PendingFeedbackContext(18-111)HumanFeedbackPending(114-184)lib/crewai/src/crewai/flow/human_feedback.py (2)
human_feedback(155-400)HumanFeedbackResult(75-111)lib/crewai/src/crewai/flow/persistence/base.py (4)
FlowPersistence(14-106)load_pending_feedback(79-95)clear_pending_feedback(97-106)save_pending_feedback(59-77)
lib/crewai/src/crewai/flow/human_feedback.py (2)
lib/crewai/src/crewai/flow/flow_wrappers.py (1)
FlowMethod(41-140)lib/crewai/src/crewai/flow/async_feedback/types.py (2)
HumanFeedbackProvider(188-264)PendingFeedbackContext(18-111)
🪛 LanguageTool
docs/ko/learn/human-feedback-in-flows.mdx
[grammar] ~152-~152: Ensure spelling is correct
Context: ...수신 시간: {result.timestamp}") ``` ## 피드백 히스토리 접근하기 Flow 클래스는 인간 피드백에 접근하기 위한 두 가지 ...
(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)
docs/pt-BR/learn/human-feedback-in-flows.mdx
[uncategorized] ~66-~66: Esta locução deve ser separada por vírgulas.
Context: ...------|-----------| | message | str | Sim | A mensagem mostrada ao humano junto c...
(VERB_COMMA_CONJUNCTION)
[style] ~66-~66: Para conferir mais clareza ao seu texto, evite pleonasmos.
Context: ...| Sim | A mensagem mostrada ao humano junto com a saída do método | |emit|Sequenc...
(REDUNDANCY_JUNTO_COM)
[typographical] ~577-~577: Símbolo sem par: “[” aparentemente está ausente
Context: ...ção Relacionada - Visão Geral de Flows - Aprenda sobre ...
(UNPAIRED_BRACKETS)
[typographical] ~578-~578: Símbolo sem par: “[” aparentemente está ausente
Context: ...lows - [Gerenciamento de Estado em Flows](/pt-BR/guides/flows/mastering-flow-stat...
(UNPAIRED_BRACKETS)
[typographical] ~579-~579: Símbolo sem par: “[” aparentemente está ausente
Context: ...estado em flows - Persistência de Flows - Pe...
(UNPAIRED_BRACKETS)
[typographical] ~580-~580: Símbolo sem par: “[” aparentemente está ausente
Context: ...stado de flows - Roteamento com @router - Mais so...
(UNPAIRED_BRACKETS)
[typographical] ~581-~581: Símbolo sem par: “[” aparentemente está ausente
Context: ... condicional - Input Humano na Execução ...
(UNPAIRED_BRACKETS)
🔇 Additional comments (40)
lib/crewai/src/crewai/translations/en.json (1)
33-33: The translation key is properly referenced and used correctly in the codebase. Atlib/crewai/src/crewai/flow/flow.py:2038, the key is loaded viai18n.slice()and the template placeholders{feedback}and{outcomes}are both correctly filled when the prompt is formatted. No issues identified.lib/crewai/src/crewai/flow/flow_wrappers.py (1)
73-80: Preserving router and human-feedback metadata inFlowMethodlooks correctCopying
__is_router__,__router_paths__, and__human_feedback_config__from the wrapped method onto theFlowMethodinstance ensures that stacking@human_feedbackwith@start/@listendoesn’t lose routing/feedback metadata, including after rebinding via__get__. This is exactly what you need for the new HITL flows.docs/docs.json (1)
296-323: Newhuman-feedback-in-flowsLearn entries are wired consistentlyThe added Learn entries for
en,pt-BR, andkofollow the existing navigation structure and point to the expected*/learn/human-feedback-in-flowspages. JSON structure and commas all look correct.Also applies to: 724-751, 1161-1187
lib/crewai/src/crewai/events/event_listener.py (1)
38-47: Paused flow/method events are integrated consistently with existing status handlersRegistering handlers for
MethodExecutionPausedEventandFlowPausedEventand mapping them toupdate_method_status(..., "paused")/update_flow_status(..., "paused")matches the existing started/completed/failed patterns and ensures the console view reflects human-feedback pauses without touching telemetry logic.Also applies to: 368-389
lib/crewai/tests/test_human_feedback_integration.py (1)
21-298: Strong end-to-end coverage for@human_feedbackintegrationThis suite does a good job exercising the decorator across routing, multi-step flows, async execution, structured state, metadata passthrough, and edge cases (empty/whitespace feedback, default outcomes). Assertions around
human_feedback_history,last_human_feedback, and structured state (viaFlowState) are especially valuable for catching regressions in the HITL flow behavior.docs/en/learn/human-in-the-loop.mdx (1)
8-153: Webhook-based HITL documentation is clear and actionableThe new breakdown between flow-based (
@human_feedback) and webhook-based HITL, plus the step-by-step kickoff/resume examples (including auth and the reminder to resend webhook URLs on resume), gives a practical, implementation-focused guide that aligns well with how such enterprise APIs are typically used.lib/crewai/src/crewai/flow/async_feedback/__init__.py (1)
1-41: LGTM - Clean package initialization with comprehensive documentation.The module provides a well-structured public API for async human feedback. The docstring example clearly demonstrates the provider pattern and integration with Flow decorators.
lib/crewai/src/crewai/events/utils/console_formatter.py (2)
456-497: LGTM - Paused state handling is consistent and well-integrated.The implementation properly computes status-specific labels, styles, and content text, treating "paused" as a first-class status alongside "completed" and "failed". The cyan styling and "⏳" emoji are appropriate visual indicators for waiting states.
518-523: LGTM - Method pause status aligns with flow-level implementation.The paused branch mirrors the structure of completed/failed branches and correctly updates the flow label to reflect the waiting-for-feedback state.
lib/crewai/src/crewai/flow/__init__.py (1)
1-34: LGTM - Public API extension is well-organized.The new exports are logically grouped by submodule and all imported symbols are properly exposed in
__all__. This provides a clean interface for users to access async feedback functionality.docs/pt-BR/learn/human-feedback-in-flows.mdx (1)
1-581: LGTM - Comprehensive and well-structured documentation.This documentation thoroughly covers the human feedback feature with:
- Clear conceptual overview with mermaid diagram
- Quick start example for immediate usage
- Detailed parameter reference
- Progressive examples from simple to complex
- Complete async provider implementation guide
- Best practices and design guidelines
The structure makes it easy for users to find what they need, whether they're starting simple or building complex async integrations.
Note: Static analysis warnings about unpaired brackets are false positives - they're standard markdown link syntax.
lib/crewai/tests/test_human_feedback_decorator.py (1)
1-401: LGTM - Excellent test coverage across all decorator functionality.This test suite comprehensively covers:
- ✅ Parameter validation (llm requirements, emit/default_outcome interdependencies)
- ✅ Configuration and result dataclass behavior
- ✅ Decorator attribute preservation with Flow decorators
- ✅ Async/sync method detection
- ✅ End-to-end execution flows with proper mocking
- ✅ History tracking across multiple feedback points
- ✅ LLM outcome collapsing logic (exact match, partial match, fallback)
The test organization into focused classes and use of mocking for external dependencies (input, LLM calls) follows best practices.
lib/crewai/src/crewai/events/types/flow_events.py (1)
61-160: LGTM - Event classes are well-designed and thoroughly documented.The four new event types provide comprehensive lifecycle coverage for human-in-the-loop flows:
MethodExecutionPausedEventandFlowPausedEventcapture pause state with all necessary context (flow_id, state, message, emit)HumanFeedbackRequestedEventandHumanFeedbackReceivedEventtrack the request/response cycleEach class includes clear docstrings explaining when it's emitted and what information it carries. The fields are appropriate for their respective contexts.
lib/crewai/src/crewai/flow/async_feedback/providers.py (1)
17-124: LGTM - Solid synchronous provider implementation with proper lifecycle management.The
ConsoleProvidercorrectly implements the feedback flow:
- Emits request event for observability
- Pauses live updates to prevent interference
- Displays formatted output (when verbose)
- Prompts and collects user input
- Emits received event
- Resumes live updates in finally block
The use of built-in
input()at line 107 (vs Rich's prompt) is appropriate here since the console formatting is already handled separately. The finally block ensures live updates are always resumed even if an error occurs.lib/crewai/src/crewai/flow/persistence/base.py (1)
59-106: LGTM - Well-designed extension with backward-compatible defaults.The three new methods for async feedback support follow good design principles:
save_pending_feedback(lines 59-77): Default delegates tosave_state, allowing persistence backends that don't support pending feedback to still function.
load_pending_feedback(lines 79-95): ReturnsNoneby default, clearly signaling "no pending feedback support" to callers.
clear_pending_feedback(lines 97-106): No-op by default, safe for non-implementing backends.The use of
TYPE_CHECKINGfor thePendingFeedbackContextimport avoids circular dependencies while maintaining proper type hints. Concrete implementations (like SQLiteFlowPersistence) can override these to provide full functionality.lib/crewai/src/crewai/flow/async_feedback/types.py (3)
17-112: Well-structured dataclass with comprehensive serialization support.The
PendingFeedbackContextdataclass is well-designed with clear documentation, proper serialization viato_dict/from_dict, and sensible defaults. The use ofTYPE_CHECKINGto avoid circular imports is appropriate.
114-184: Good exception design for control flow signaling.The
HumanFeedbackPendingexception is well-documented and properly signals flow pause. Thenoqa: N818comment appropriately explains why this exception doesn't follow the standard naming convention. The decision to return this fromkickoff()rather than raising is a good UX choice.
187-264: Clean protocol definition for extensibility.The
HumanFeedbackProviderprotocol with@runtime_checkableallows for flexible provider implementations. The documentation clearly explains both sync and async usage patterns.lib/crewai/tests/test_async_human_feedback.py (4)
1-31: Comprehensive test suite for async human feedback functionality.The test organization is excellent with clear sections for each component (PendingFeedbackContext, HumanFeedbackPending, providers, persistence, resume, integration). The use of
tempfile.TemporaryDirectoryensures test isolation.
893-972: Good edge case coverage for LLM response parsing.The
TestCollapseToOutcomeJsonParsingclass thoroughly tests JSON parsing edge cases including valid JSON responses, plain string responses, invalid JSON fallback, and LLM exceptions with retry logic. This ensures robust outcome collapsing.
1000-1069: Empty/missing feedback edge cases well covered.Tests for
test_empty_feedback_uses_default_outcomeandtest_resume_without_feedback_uses_defaultproperly verify the default outcome fallback behavior when feedback is empty or omitted.
530-564: The project already haspytest-asyncio==1.3.0installed and configured withasyncio_mode = "strict"inpyproject.toml. Both test methods are properly structured:test_resume_from_async_context_raises_errorusesasyncio.run()whiletest_resume_async_directcorrectly uses@pytest.mark.asyncio. No action needed.docs/en/learn/human-feedback-in-flows.mdx (1)
1-581: Excellent, comprehensive documentation for human feedback in flows.The documentation covers all aspects of the
@human_feedbackdecorator:
- Clear overview with mermaid diagram
- Complete parameter reference table
- Progressive examples from simple to complex
- Async/non-blocking workflows with provider pattern
- Best practices section with actionable guidance
- Proper warnings about async context limitations
The structure is logical and the examples are practical. This will help users adopt the feature effectively.
lib/crewai/src/crewai/flow/persistence/sqlite.py (3)
95-113: Good schema design for pending feedback persistence.The
pending_feedbacktable correctly usesUNIQUEconstraint onflow_uuidto prevent duplicate entries, and theINSERT OR REPLACEpattern insave_pending_feedbackhandles re-triggering scenarios properly. The index onflow_uuidensures efficient lookups.
182-233: Well-implemented pending feedback persistence.The
save_pending_feedbackmethod correctly:
- Converts state data to dict format
- Saves to both the regular state table (for consistency) and the pending_feedback table
- Uses UTC timestamps
- Handles upsert via
INSERT OR REPLACE
234-281: Load and clear methods are straightforward and correct.Both
load_pending_feedbackandclear_pending_feedbackproperly handle the SQLite operations. The load method correctly reconstructs thePendingFeedbackContextfrom the stored JSON.lib/crewai/src/crewai/flow/flow.py (8)
560-624: Well-designedfrom_pendingclassmethod for flow restoration.The
from_pendingimplementation correctly:
- Uses default
SQLiteFlowPersistenceif none provided- Loads and validates pending feedback context
- Restores state and marks the method as completed
- Sets the resuming flag appropriately
The documentation is clear about usage patterns.
643-693: Correct async context detection inresume().The check for a running event loop using
asyncio.get_running_loop()withtry/except RuntimeErroris the correct pattern. RaisingRuntimeErrorwith a clear message guides users to useresume_asyncwhen in async context.
695-848: Comprehensiveresume_asyncimplementation.The async resume method properly:
- Validates pending context exists
- Handles empty feedback with default outcomes
- Collapses feedback to outcome when emit is specified
- Creates and stores
HumanFeedbackResult- Clears pending feedback from persistence
- Emits appropriate events
- Handles nested
HumanFeedbackPendingfor loop casesThe logic for router vs non-router behavior is correctly differentiated.
1327-1388: Good handling ofHumanFeedbackPendinginkickoff_async.The try/except block correctly catches
HumanFeedbackPending, auto-creates persistence if needed, saves the pending state, emitsFlowPausedEvent, and returns the exception instead of raising. This enables graceful handling without try/except in user code.
1580-1614: Proper event emission for paused methods.The
_execute_methodnow correctly distinguishes betweenHumanFeedbackPending(paused, not failed) and regular exceptions, emittingMethodExecutionPausedEventfor the former. This ensures accurate event tracking.
1987-2100: Robust_collapse_to_outcomewith fallback handling.The method correctly:
- Creates a dynamic Pydantic model with
Literaltype for structured outputs- Handles JSON string responses from LLM
- Falls back to simple prompting if structured output fails
- Uses case-insensitive matching with partial match fallback
- Logs warnings for unexpected scenarios
This provides reliable outcome collapsing even with varied LLM behaviors.
2029-2034: Dynamic Literal type creation.The dynamic
Literaltype creation usingLiteral[outcomes_tuple]is a clever approach, though the# type: ignore[valid-type]is needed because mypy can't verify runtime-constructed types. This is an acceptable pattern for this use case.
1467-1490: Router handling in start methods supports human feedback.The enhanced
_execute_start_methodcorrectly handles cases where a start method is also a router (e.g.,@human_feedbackwithemit). It executes listeners for both the method name and the router result, passingHumanFeedbackResultappropriately.docs/ko/learn/human-feedback-in-flows.mdx (1)
1-581: Consistent Korean translation of human feedback documentation.The Korean documentation mirrors the English version's structure and completeness, covering all aspects of the
@human_feedbackdecorator. The translation maintains technical accuracy while adapting to Korean conventions.The static analysis hint about spelling at line 152 ("피드백 히스토리") appears to be a false positive as it's correctly spelled Korean text.
lib/crewai/src/crewai/flow/human_feedback.py (5)
74-112: Well-structuredHumanFeedbackResultdataclass.The dataclass captures all relevant information about a human feedback interaction. The documentation is thorough with clear attribute descriptions and usage example.
250-264: Good decoration-time validation.The upfront validation ensures:
llmis required whenemitis specifieddefault_outcomemust be inemitlistdefault_outcomerequiresemitThis prevents runtime errors by catching configuration issues early.
268-296: Clean separation of feedback request logic.The
_request_feedbackhelper correctly buildsPendingFeedbackContextand delegates to either the custom provider or the default console input. The flow_id handling with fallback to "unknown" is defensive.
342-369: Proper async/sync wrapper differentiation.The decorator correctly uses
asyncio.iscoroutinefunctionto determine whether to create an async or sync wrapper, ensuring the wrapped method maintains its async nature.
371-397: Complete attribute preservation for Flow integration.The wrapper correctly preserves existing Flow decorator attributes (
__is_start_method__,__trigger_methods__, etc.) and adds human feedback specific attributes. Setting__is_router__and__router_paths__whenemitis specified enables proper routing behavior.
| ### Human in the Loop (human feedback) | ||
|
|
||
| The `@human_feedback` decorator enables human-in-the-loop workflows by pausing flow execution to collect feedback from a human. This is useful for approval gates, quality review, and decision points that require human judgment. | ||
|
|
||
| ```python Code | ||
| from crewai.flow.flow import Flow, start, listen | ||
| from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult | ||
|
|
||
| class ReviewFlow(Flow): | ||
| @start() | ||
| @human_feedback( | ||
| message="Do you approve this content?", | ||
| emit=["approved", "rejected", "needs_revision"], | ||
| llm="gpt-4o-mini", | ||
| default_outcome="needs_revision", | ||
| ) | ||
| def generate_content(self): | ||
| return "Content to be reviewed..." | ||
|
|
||
| @listen("approved") | ||
| def on_approval(self, result: HumanFeedbackResult): | ||
| print(f"Approved! Feedback: {result.feedback}") | ||
|
|
||
| @listen("rejected") | ||
| def on_rejection(self, result: HumanFeedbackResult): | ||
| print(f"Rejected. Reason: {result.feedback}") | ||
| ``` | ||
|
|
||
| When `emit` is specified, the human's free-form feedback is interpreted by an LLM and collapsed into one of the specified outcomes, which then triggers the corresponding `@listen` decorator. | ||
|
|
||
| You can also use `@human_feedback` without routing to simply collect feedback: | ||
|
|
||
| ```python Code | ||
| @start() | ||
| @human_feedback(message="Any comments on this output?") | ||
| def my_method(self): | ||
| return "Output for review" | ||
|
|
||
| @listen(my_method) | ||
| def next_step(self, result: HumanFeedbackResult): | ||
| # Access feedback via result.feedback | ||
| # Access original output via result.output | ||
| pass | ||
| ``` | ||
|
|
||
| Access all feedback collected during a flow via `self.last_human_feedback` (most recent) or `self.human_feedback_history` (all feedback as a list). | ||
|
|
||
| For a complete guide on human feedback in flows, including **async/non-blocking feedback** with custom providers (Slack, webhooks, etc.), see [Human Feedback in Flows](/en/learn/human-feedback-in-flows). | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Find human_feedback implementation
find . -type f -name "*.py" | xargs grep -l "human_feedback" | head -20Repository: Devasy/crewAI-telemetry
Length of output: 852
🏁 Script executed:
# Search for the human_feedback decorator definition
rg "def human_feedback" -A 10Repository: Devasy/crewAI-telemetry
Length of output: 941
🏁 Script executed:
# Look for flow listen decorator and routing logic
rg "@listen|def listen" -A 5 | head -50Repository: Devasy/crewAI-telemetry
Length of output: 3747
🏁 Script executed:
cat -n lib/crewai/src/crewai/flow/human_feedback.py | head -100Repository: Devasy/crewAI-telemetry
Length of output: 4159
🏁 Script executed:
# Look for the implementation of listener dispatch with emit
rg "emit" lib/crewai/src/crewai/flow/human_feedback.py -A 5 -B 5Repository: Devasy/crewAI-telemetry
Length of output: 9370
🏁 Script executed:
# Examine the test file for emit routing behavior
cat -n lib/crewai/tests/test_async_human_feedback.py | sed -n '1,150p'Repository: Devasy/crewAI-telemetry
Length of output: 6650
Fix listener signature in routed human-feedback example
The @listen("approved") method should not expect a result parameter. When emit is specified, the decorator returns the outcome string (not HumanFeedbackResult), and the feedback object is stored on self.last_human_feedback. Update the example:
@listen("approved")
def on_approval(self):
result = self.last_human_feedback
print(f"Approved! Feedback: {result.feedback}")
@listen("rejected")
def on_rejection(self):
result = self.last_human_feedback
print(f"Rejected. Reason: {result.feedback}")🤖 Prompt for AI Agents
In docs/en/concepts/flows.mdx around lines 575 to 623, the routed human-feedback
example uses @listen("approved") and @listen("rejected") handlers that accept a
result parameter, but when emit is specified the decorator yields an outcome
string and the feedback is stored on self.last_human_feedback; update the
example handlers to remove the result parameter and instead read feedback from
self.last_human_feedback (e.g., assign result = self.last_human_feedback then
use result.feedback) for both approved and rejected listeners and adjust any
example prints accordingly.
| ### Human in the Loop (인간 피드백) | ||
|
|
||
| `@human_feedback` 데코레이터는 인간의 피드백을 수집하기 위해 플로우 실행을 일시 중지하는 human-in-the-loop 워크플로우를 가능하게 합니다. 이는 승인 게이트, 품질 검토, 인간의 판단이 필요한 결정 지점에 유용합니다. | ||
|
|
||
| ```python Code | ||
| from crewai.flow.flow import Flow, start, listen | ||
| from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult | ||
|
|
||
| class ReviewFlow(Flow): | ||
| @start() | ||
| @human_feedback( | ||
| message="이 콘텐츠를 승인하시겠습니까?", | ||
| emit=["approved", "rejected", "needs_revision"], | ||
| llm="gpt-4o-mini", | ||
| default_outcome="needs_revision", | ||
| ) | ||
| def generate_content(self): | ||
| return "검토할 콘텐츠..." | ||
|
|
||
| @listen("approved") | ||
| def on_approval(self, result: HumanFeedbackResult): | ||
| print(f"승인됨! 피드백: {result.feedback}") | ||
|
|
||
| @listen("rejected") | ||
| def on_rejection(self, result: HumanFeedbackResult): | ||
| print(f"거부됨. 이유: {result.feedback}") | ||
| ``` | ||
|
|
||
| `emit`이 지정되면, 인간의 자유 형식 피드백이 LLM에 의해 해석되어 지정된 outcome 중 하나로 매핑되고, 해당 `@listen` 데코레이터를 트리거합니다. | ||
|
|
||
| 라우팅 없이 단순히 피드백만 수집할 수도 있습니다: | ||
|
|
||
| ```python Code | ||
| @start() | ||
| @human_feedback(message="이 출력에 대한 코멘트가 있으신가요?") | ||
| def my_method(self): | ||
| return "검토할 출력" | ||
|
|
||
| @listen(my_method) | ||
| def next_step(self, result: HumanFeedbackResult): | ||
| # result.feedback로 피드백에 접근 | ||
| # result.output으로 원래 출력에 접근 | ||
| pass | ||
| ``` | ||
|
|
||
| 플로우 실행 중 수집된 모든 피드백은 `self.last_human_feedback` (가장 최근) 또는 `self.human_feedback_history` (리스트 형태의 모든 피드백)를 통해 접근할 수 있습니다. | ||
|
|
||
| 플로우에서의 인간 피드백에 대한 완전한 가이드는 비동기/논블로킹 피드백과 커스텀 프로바이더(Slack, 웹훅 등)를 포함하여 [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows)을 참조하세요. | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "*.py" | xargs grep -l "human_feedback\|HumanFeedback" | head -20Repository: Devasy/crewAI-telemetry
Length of output: 956
🏁 Script executed:
find . -type f -path "*/flow/*" -name "*.py" | head -20Repository: Devasy/crewAI-telemetry
Length of output: 1147
🏁 Script executed:
cat -n ./lib/crewai/src/crewai/flow/human_feedback.pyRepository: Devasy/crewAI-telemetry
Length of output: 17841
🏁 Script executed:
cat -n ./lib/crewai/tests/test_human_feedback_decorator.pyRepository: Devasy/crewAI-telemetry
Length of output: 16048
🏁 Script executed:
grep -n "def _execute_listener\|def _route\|@listen" ./lib/crewai/src/crewai/flow/flow.py | head -40Repository: Devasy/crewAI-telemetry
Length of output: 772
🏁 Script executed:
sed -n '1,100p' ./lib/crewai/src/crewai/flow/flow.py | cat -nRepository: Devasy/crewAI-telemetry
Length of output: 3480
🏁 Script executed:
sed -n '1626,1700p' ./lib/crewai/src/crewai/flow/flow.py | cat -nRepository: Devasy/crewAI-telemetry
Length of output: 4204
🏁 Script executed:
sed -n '780,800p' ./lib/crewai/src/crewai/flow/flow.py | cat -nRepository: Devasy/crewAI-telemetry
Length of output: 1253
🏁 Script executed:
sed -n '1676,1730p' ./lib/crewai/src/crewai/flow/flow.py | cat -nRepository: Devasy/crewAI-telemetry
Length of output: 3452
🏁 Script executed:
grep -n "_execute_single_listener" ./lib/crewai/src/crewai/flow/flow.py | head -5Repository: Devasy/crewAI-telemetry
Length of output: 455
🏁 Script executed:
sed -n '1540,1620p' ./lib/crewai/src/crewai/flow/flow.py | cat -nRepository: Devasy/crewAI-telemetry
Length of output: 3699
🏁 Script executed:
sed -n '1842,1920p' ./lib/crewai/src/crewai/flow/flow.py | cat -nRepository: Devasy/crewAI-telemetry
Length of output: 4304
🏁 Script executed:
sed -n '1470,1530p' ./lib/crewai/src/crewai/flow/flow.py | cat -nRepository: Devasy/crewAI-telemetry
Length of output: 3159
🏁 Script executed:
grep -n "@listen" ./lib/crewai/tests/test_human_feedback_integration.py | head -20Repository: Devasy/crewAI-telemetry
Length of output: 660
🏁 Script executed:
cat -n ./lib/crewai/tests/test_human_feedback_integration.py | head -150Repository: Devasy/crewAI-telemetry
Length of output: 6161
🏁 Script executed:
sed -n '63,95p' ./lib/crewai/tests/test_human_feedback_integration.py | cat -nRepository: Devasy/crewAI-telemetry
Length of output: 1365
Routed listener parameter type is optional; adjust example for consistency
The routed listener example in the Korean docs:
@listen("approved")
def on_approval(self, result: HumanFeedbackResult):
print(f"승인됨! 피드백: {result.feedback}")shows an optional HumanFeedbackResult parameter. While the flow framework supports parameterized listeners (it inspects the signature and passes the result if a parameter exists), all test examples in the codebase use parameterless routed listeners. To keep the Korean docs aligned with established patterns, remove the parameter and access feedback via self.last_human_feedback:
@listen("approved")
def on_approval(self):
result = self.last_human_feedback
print(f"승인됨! 피드백: {result.feedback}")This matches the pattern shown in the library's own docstring examples and test cases.
🤖 Prompt for AI Agents
In docs/ko/concepts/flows.mdx around lines 568 to 616, the routed listener
examples include a parameter of type HumanFeedbackResult but the project
examples and tests use parameterless routed listeners; update the two routed
listener examples to remove the result parameter and instead read feedback from
self.last_human_feedback (e.g., change on_approval(self, result:
HumanFeedbackResult) to on_approval(self) and retrieve result =
self.last_human_feedback inside, and do the same for on_rejection/other routed
listeners) so the examples match the established pattern.
| ### Human in the Loop (feedback humano) | ||
|
|
||
| O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop, pausando a execução do flow para coletar feedback de um humano. Isso é útil para portões de aprovação, revisão de qualidade e pontos de decisão que requerem julgamento humano. | ||
|
|
||
| ```python Code | ||
| from crewai.flow.flow import Flow, start, listen | ||
| from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult | ||
|
|
||
| class ReviewFlow(Flow): | ||
| @start() | ||
| @human_feedback( | ||
| message="Você aprova este conteúdo?", | ||
| emit=["approved", "rejected", "needs_revision"], | ||
| llm="gpt-4o-mini", | ||
| default_outcome="needs_revision", | ||
| ) | ||
| def generate_content(self): | ||
| return "Conteúdo para revisão..." | ||
|
|
||
| @listen("approved") | ||
| def on_approval(self, result: HumanFeedbackResult): | ||
| print(f"Aprovado! Feedback: {result.feedback}") | ||
|
|
||
| @listen("rejected") | ||
| def on_rejection(self, result: HumanFeedbackResult): | ||
| print(f"Rejeitado. Motivo: {result.feedback}") | ||
| ``` | ||
|
|
||
| Quando `emit` é especificado, o feedback livre do humano é interpretado por um LLM e mapeado para um dos outcomes especificados, que então dispara o decorador `@listen` correspondente. | ||
|
|
||
| Você também pode usar `@human_feedback` sem roteamento para simplesmente coletar feedback: | ||
|
|
||
| ```python Code | ||
| @start() | ||
| @human_feedback(message="Algum comentário sobre esta saída?") | ||
| def my_method(self): | ||
| return "Saída para revisão" | ||
|
|
||
| @listen(my_method) | ||
| def next_step(self, result: HumanFeedbackResult): | ||
| # Acesse o feedback via result.feedback | ||
| # Acesse a saída original via result.output | ||
| pass | ||
| ``` | ||
|
|
||
| Acesse todo o feedback coletado durante um flow via `self.last_human_feedback` (mais recente) ou `self.human_feedback_history` (todo o feedback em uma lista). | ||
|
|
||
| Para um guia completo sobre feedback humano em flows, incluindo feedback assíncrono/não-bloqueante com providers customizados (Slack, webhooks, etc.), veja [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "*.py" | grep -E "(human|feedback)" | head -20Repository: Devasy/crewAI-telemetry
Length of output: 537
🏁 Script executed:
find . -type f -path "*/flow/*" -name "*.py" | head -20Repository: Devasy/crewAI-telemetry
Length of output: 1147
🏁 Script executed:
cat -n ./lib/crewai/src/crewai/flow/human_feedback.pyRepository: Devasy/crewAI-telemetry
Length of output: 17841
🏁 Script executed:
find . -type f -name "flow.py" | grep -v __pycache__Repository: Devasy/crewAI-telemetry
Length of output: 103
🏁 Script executed:
cat -n ./lib/crewai/src/crewai/flow/flow.py | head -100Repository: Devasy/crewAI-telemetry
Length of output: 3480
🏁 Script executed:
grep -n "@listen" ./lib/crewai/tests/test_human_feedback_decorator.py -A 5 | head -40Repository: Devasy/crewAI-telemetry
Length of output: 690
🏁 Script executed:
grep -n "def listen" ./lib/crewai/src/crewai/flow/flow.py -A 20 | head -60Repository: Devasy/crewAI-telemetry
Length of output: 866
🏁 Script executed:
grep -n "class ListenMethod\|def listen" ./lib/crewai/src/crewai/flow/flow_wrappers.py -A 15 | head -80Repository: Devasy/crewAI-telemetry
Length of output: 689
🏁 Script executed:
grep -n "_execute\|_invoke\|apply_step\|_call_listener" ./lib/crewai/src/crewai/flow/flow.py | head -30Repository: Devasy/crewAI-telemetry
Length of output: 1529
🏁 Script executed:
grep -n "def _execute\|def apply_step" ./lib/crewai/src/crewai/flow/flow.py -A 30 | head -100Repository: Devasy/crewAI-telemetry
Length of output: 4726
🏁 Script executed:
sed -n '1842,1920p' ./lib/crewai/src/crewai/flow/flow.pyRepository: Devasy/crewAI-telemetry
Length of output: 3751
🏁 Script executed:
sed -n '1626,1750p' ./lib/crewai/src/crewai/flow/flow.pyRepository: Devasy/crewAI-telemetry
Length of output: 6261
🏁 Script executed:
sed -n '1678,1710p' ./lib/crewai/src/crewai/flow/flow.pyRepository: Devasy/crewAI-telemetry
Length of output: 1830
🏁 Script executed:
grep -rn "on_approval\|on_rejection" ./lib/crewai/tests/test_human_feedback_decorator.py -A 8 | head -50Repository: Devasy/crewAI-telemetry
Length of output: 49
🏁 Script executed:
grep -n "class.*Flow\|def on_approval\|def on_rejection\|@listen.*approved\|@listen.*rejected" ./lib/crewai/tests/test_human_feedback_decorator.py -A 5 | head -80Repository: Devasy/crewAI-telemetry
Length of output: 1477
🏁 Script executed:
cat -n ./lib/crewai/tests/test_human_feedback_decorator.py | sed -n '200,350p'Repository: Devasy/crewAI-telemetry
Length of output: 6143
🏁 Script executed:
grep -n "emit=\|listener.*approved\|on_approval\|on_rejection" ./lib/crewai/tests/test_human_feedback_decorator.py -B 5 -A 10 | head -100Repository: Devasy/crewAI-telemetry
Length of output: 3549
🏁 Script executed:
grep -rn "emit=" ./lib/crewai/tests/ --include="*.py" -A 20 | grep -A 15 "@listen"Repository: Devasy/crewAI-telemetry
Length of output: 8108
🏁 Script executed:
sed -n '318,345p' ./lib/crewai/tests/test_human_feedback_integration.pyRepository: Devasy/crewAI-telemetry
Length of output: 1023
Update listener examples to remove incorrect parameter type annotation
In the routed human-feedback example (lines 327-330), the listener methods show:
@listen("approved")
def on_approval(self, result: HumanFeedbackResult):
print(f"Aprovado! Feedback: {result.feedback}")However, routed listeners do not receive a parameter. The @human_feedback decorator with emit returns an outcome string (not HumanFeedbackResult), and the HumanFeedbackResult is accessed via self.last_human_feedback. The listener signature should be:
@listen("approved")
def on_approval(self):
print(f"Aprovado! Feedback: {self.last_human_feedback.feedback}")Apply the same fix to the on_rejection and any other routed listeners.
🤖 Prompt for AI Agents
In docs/pt-BR/concepts/flows.mdx around lines 310 to 357, the routed
human-feedback listener examples incorrectly declare a parameter of type
HumanFeedbackResult (e.g. def on_approval(self, result: HumanFeedbackResult)):
remove the parameter from routed listeners (def on_approval(self):) and update
the body to read feedback/output from self.last_human_feedback (e.g.
self.last_human_feedback.feedback and self.last_human_feedback.output); make the
same change for on_rejection and any other routed listener examples in this
block.
| default_outcome: str | None = None | ||
| metadata: dict[str, Any] = field(default_factory=dict) | ||
| llm: str | None = None | ||
| requested_at: datetime = field(default_factory=datetime.now) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider using timezone-aware datetime for requested_at.
Using datetime.now without timezone info can cause issues when serializing/deserializing across different timezones or in distributed systems. Consider using datetime.now(timezone.utc) for consistency with the persistence layer.
🔎 Proposed fix
- requested_at: datetime = field(default_factory=datetime.now)
+ requested_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))And add the import at the top:
from datetime import datetime, timezone📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| requested_at: datetime = field(default_factory=datetime.now) | |
| requested_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) |
🤖 Prompt for AI Agents
In lib/crewai/src/crewai/flow/async_feedback/types.py around line 63, the
requested_at field uses datetime.now which yields a naive datetime; change it to
a timezone-aware timestamp by using datetime.now(timezone.utc) and add the
corresponding import for timezone from datetime at the top of the file, ensuring
all uses and serializations expect UTC-aware datetimes.
| requested_at = data.get("requested_at") | ||
| if isinstance(requested_at, str): | ||
| requested_at = datetime.fromisoformat(requested_at) | ||
| elif requested_at is None: | ||
| requested_at = datetime.now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Timezone handling inconsistency in from_dict.
The fallback to datetime.now() should also use UTC for consistency. Additionally, datetime.fromisoformat will preserve the timezone if the string contains it, but if it doesn't, the result is naive.
🔎 Proposed fix
requested_at = data.get("requested_at")
if isinstance(requested_at, str):
requested_at = datetime.fromisoformat(requested_at)
elif requested_at is None:
- requested_at = datetime.now()
+ requested_at = datetime.now(timezone.utc)Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In lib/crewai/src/crewai/flow/async_feedback/types.py around lines 94 to 98, the
from_dict handling of requested_at is inconsistent with timezone handling: the
fallback uses datetime.now() (naive local time) and datetime.fromisoformat may
return a naive datetime if the string lacks a timezone. Update the logic to
produce a timezone-aware UTC datetime: when requested_at is a string, parse with
datetime.fromisoformat and if the resulting datetime is naive, set its tzinfo to
timezone.utc; when requested_at is None, use datetime.now(timezone.utc) as the
fallback so all returned datetimes are UTC-aware.
| output: Any | ||
| feedback: str | ||
| outcome: str | None = None | ||
| timestamp: datetime = field(default_factory=datetime.now) | ||
| method_name: str = "" | ||
| metadata: dict[str, Any] = field(default_factory=dict) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider timezone consistency for timestamp.
Similar to PendingFeedbackContext, the HumanFeedbackResult.timestamp uses datetime.now without timezone. For consistency across the codebase, consider using UTC.
🔎 Proposed fix
+from datetime import datetime, timezone
+
@dataclass
class HumanFeedbackResult:
# ... fields ...
- timestamp: datetime = field(default_factory=datetime.now)
+ timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In lib/crewai/src/crewai/flow/human_feedback.py around lines 106 to 111, the
HumanFeedbackResult.timestamp default_factory uses naive datetime.now; change it
to produce an aware UTC timestamp (e.g., default_factory=lambda:
datetime.now(timezone.utc)) to match PendingFeedbackContext and maintain
timezone consistency across the codebase, and add/ensure the necessary import of
timezone from datetime at top of the file.
| # Import here to avoid circular imports | ||
| from crewai.flow.async_feedback.types import PendingFeedbackContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Remove redundant import inside method.
The PendingFeedbackContext is already imported under TYPE_CHECKING at line 19. This runtime import is needed for from_dict, but the comment is misleading since the import at line 19 handles type hints.
The import is necessary at runtime for from_dict() call, but the comment could be clearer:
- # Import here to avoid circular imports
+ # Runtime import for deserialization (TYPE_CHECKING import is for type hints only)
from crewai.flow.async_feedback.types import PendingFeedbackContext🤖 Prompt for AI Agents
In lib/crewai/src/crewai/flow/persistence/sqlite.py around lines 198-199, the
inline import of PendingFeedbackContext is required at runtime for the
.from_dict() call so the existing comment claiming it's only to avoid circular
imports is misleading; update the comment to state that this import is
intentionally performed at runtime because PendingFeedbackContext is needed by
from_dict (or alternatively move a real runtime import to module top-level and
keep the TYPE_CHECKING import only for type hints) and leave the import in place
so from_dict continues to work.
| "lite_agent_response_format": "Ensure your final answer strictly adheres to the following OpenAPI schema: {response_format}\n\nDo not include the OpenAPI schema in the final output. Ensure the final output does not include any code block markers like ```json or ```python.", | ||
| "knowledge_search_query": "The original query is: {task_prompt}.", | ||
| "knowledge_search_query_system_prompt": "Your goal is to rewrite the user query so that it is optimized for retrieval from a vector database. Consider how the query will be used to find relevant documents, and aim to make it more specific and context-aware. \n\n Do not include any other text than the rewritten query, especially any preamble or postamble and only add expected output format if its relevant to the rewritten query. \n\n Focus on the key words of the intended task and to retrieve the most relevant information. \n\n There will be some extra context provided that might need to be removed such as expected_output formats structured_outputs and other instructions." | ||
| "knowledge_search_query_system_prompt": "Your goal is to rewrite the user query so that it is optimized for retrieval from a vector database. Consider how the query will be used to find relevant documents, and aim to make it more specific and context-aware. \n\n Do not include any other text than the rewritten query, especially any preamble or postamble and only add expected output format if its relevant to the rewritten query. \n\n Focus on the key words of the intended task and to retrieve the most relevant information. \n\n There will be some extra context provided that might need to be removed such as expected_output formats structured_outputs and other instructions.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix grammatical error in prompt text.
The phrase "if its relevant" should be "if it's relevant" (using the contraction "it's" for "it is").
🔎 Proposed fix
- "knowledge_search_query_system_prompt": "Your goal is to rewrite the user query so that it is optimized for retrieval from a vector database. Consider how the query will be used to find relevant documents, and aim to make it more specific and context-aware. \n\n Do not include any other text than the rewritten query, especially any preamble or postamble and only add expected output format if its relevant to the rewritten query. \n\n Focus on the key words of the intended task and to retrieve the most relevant information. \n\n There will be some extra context provided that might need to be removed such as expected_output formats structured_outputs and other instructions.",
+ "knowledge_search_query_system_prompt": "Your goal is to rewrite the user query so that it is optimized for retrieval from a vector database. Consider how the query will be used to find relevant documents, and aim to make it more specific and context-aware. \n\n Do not include any other text than the rewritten query, especially any preamble or postamble and only add expected output format if it's relevant to the rewritten query. \n\n Focus on the key words of the intended task and to retrieve the most relevant information. \n\n There will be some extra context provided that might need to be removed such as expected_output formats structured_outputs and other instructions.",📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "knowledge_search_query_system_prompt": "Your goal is to rewrite the user query so that it is optimized for retrieval from a vector database. Consider how the query will be used to find relevant documents, and aim to make it more specific and context-aware. \n\n Do not include any other text than the rewritten query, especially any preamble or postamble and only add expected output format if its relevant to the rewritten query. \n\n Focus on the key words of the intended task and to retrieve the most relevant information. \n\n There will be some extra context provided that might need to be removed such as expected_output formats structured_outputs and other instructions.", | |
| "knowledge_search_query_system_prompt": "Your goal is to rewrite the user query so that it is optimized for retrieval from a vector database. Consider how the query will be used to find relevant documents, and aim to make it more specific and context-aware. \n\n Do not include any other text than the rewritten query, especially any preamble or postamble and only add expected output format if it's relevant to the rewritten query. \n\n Focus on the key words of the intended task and to retrieve the most relevant information. \n\n There will be some extra context provided that might need to be removed such as expected_output formats structured_outputs and other instructions.", |
🤖 Prompt for AI Agents
In lib/crewai/src/crewai/translations/en.json around line 32, the prompt text
contains a grammatical error: change the phrase "if its relevant" to "if it's
relevant" so the contraction for "it is" is used; update the JSON string
accordingly (preserve surrounding punctuation and newlines) so the value remains
valid JSON.
| @patch("builtins.input", return_value="I approve") | ||
| @patch("builtins.print") | ||
| def test_routes_to_matching_listener(self, mock_print, mock_input): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider dropping redundant builtins.input patches where _request_human_feedback is mocked
In many tests you patch both builtins.input and the instance method _request_human_feedback, but the decorator calls _request_human_feedback, so the input patch is never actually used. Removing the @patch("builtins.input", ...) in those cases would simplify the test signatures and make it clearer which path is under test.
Also applies to: 100-102, 130-132, 164-166, 201-203, 330-332, 362-364, 385-387, 408-410
🤖 Prompt for AI Agents
In lib/crewai/tests/test_human_feedback_integration.py around lines 24-26,
several tests patch builtins.input even though they also patch/mock the instance
method _request_human_feedback (so input is never invoked); remove the redundant
@patch("builtins.input", ...) decorators from those tests (and similarly from
the other listed locations: 100-102, 130-132, 164-166, 201-203, 330-332,
362-364, 385-387, 408-410), and update the test function signatures to drop the
corresponding mock_input parameter so only the relevant mocks remain, keeping
the behavior and assertions unchanged.
* WIP gh pr refactor: update agent executor handling and introduce flow-based executor * wip * refactor: clean up comments and improve code clarity in agent executor flow - Removed outdated comments and unnecessary explanations in and classes to enhance code readability. - Simplified parameter updates in the agent executor to avoid confusion regarding executor recreation. - Improved clarity in the method to ensure proper handling of non-final answers without raising errors. * bumping pytest-randomly numpy * also bump versions of anthropic sdk * ensure flow logs are not passed if its on executor * revert anthropic bump * fix * refactor: update dependency markers in uv.lock for platform compatibility - Enhanced dependency markers for , , , and others to ensure compatibility across different platforms (Linux, Darwin, and architecture-specific conditions). - Removed unnecessary event emission in the class during kickoff. - Cleaned up commented-out code in the class for better readability and maintainability. * drop dupllicate * test: enhance agent executor creation and stop word assertions - Added calls to create_agent_executor in multiple test cases to ensure proper agent execution setup. - Updated assertions for stop words in the agent tests to remove unnecessary checks and improve clarity. - Ensured consistency in task handling by invoking create_agent_executor with the appropriate task parameter. * refactor: reorganize agent executor imports and introduce CrewAgentExecutorFlow - Removed the old import of CrewAgentExecutorFlow and replaced it with the new import from the experimental module. - Updated relevant references in the codebase to ensure compatibility with the new structure. - Enhanced the organization of imports in core.py and base_agent.py for better clarity and maintainability. * updating name * dropped usage of printer here for rich console and dropped non-added value logging * address i18n * Enhance concurrency control in CrewAgentExecutorFlow by introducing a threading lock to prevent concurrent executions. This change ensures that the executor instance cannot be invoked while already running, improving stability and reliability during flow execution. * string literal returns * string literal returns * Enhance CrewAgentExecutor initialization by allowing optional i18n parameter for improved internationalization support. This change ensures that the executor can utilize a provided i18n instance or fallback to the default, enhancing flexibility in multilingual contexts. --------- Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
…crewAIInc#4160) * Refactor EventListener and TraceCollectionListener for improved event handling - Removed unused threading and method branches from EventListener to simplify the code. - Updated event handling methods in EventListener to use new formatter methods for better clarity and consistency. - Refactored TraceCollectionListener to eliminate unnecessary parameters in formatter calls, enhancing readability. - Simplified ConsoleFormatter by removing outdated tree management methods and focusing on panel-based output for status updates. - Enhanced ToolUsage to track run attempts for better tool usage metrics. * clearer for knowledge retrieval and dropped some reduancies * Refactor EventListener and ConsoleFormatter for improved clarity and consistency - Removed the MCPToolExecutionCompletedEvent handler from EventListener to streamline event processing. - Updated ConsoleFormatter to enhance output formatting by adding line breaks for better readability in status content. - Renamed status messages for MCP Tool execution to provide clearer context during tool operations. * fix run attempt incrementation * task name consistency * memory events consistency * ensure hitl works * linting
crewAIInc#4003) * feat: Introduce production-ready Flows and Crews architecture with new runner and updated documentation across multiple languages. * ko and pt-br for tracing missing links --------- Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
Resolved 4 review comments from Cursor Bugbot: 1. Added token tracking for async tasks in _execute_tasks and _process_async_tasks 2. Fixed task key collision by including task_id in the key 3. Added token tracking for _aexecute_tasks paths (both sync and async) 4. Fixed agent metrics to be keyed by agent_id to handle multiple agents with same role All async tasks now capture tokens_before/after and attach metrics properly. Task metrics now use unique keys to prevent overwriting. Agent metrics properly track separate agents with same role.
…sy23/crewAI-telemetry into feat/per-user-token-tracing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 25
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (6)
lib/crewai/src/crewai/experimental/evaluation/agent_evaluator.py (1)
36-44: Suggest consistent attribute initialization.The
ExecutionStateclass mixes class-level and instance-level attribute declarations inconsistently:
current_agent_idandcurrent_task_idare declared as class attributes (lines 37-38) but not initialized in__init__- Other attributes (
traces,iteration,iterations_results,agent_evaluators) are properly initialized in__init__When these attributes are set on instances (e.g., lines 95-96), they become instance attributes, but the class-level declarations suggest shared state, which is misleading.
🔎 Proposed fix: Initialize all attributes in __init__
class ExecutionState: - current_agent_id: str | None = None - current_task_id: str | None = None - def __init__(self): + self.current_agent_id: str | None = None + self.current_task_id: str | None = None self.traces = {} self.iteration = 1 self.iterations_results = {} self.agent_evaluators = {}This makes it clear that all attributes are per-instance and avoids confusion about shared state.
lib/crewai/src/crewai/agents/crew_agent_executor.py (1)
547-561: Address the 5-second blocking timeout for event emission.The code calls
future.result(timeout=5.0)to block agent execution while waiting for event handlers to complete. This design decision has documented tradeoffs:
- Compatibility confirmed:
emit()returns either ThreadPoolExecutor futures (for sync handlers) or asyncio futures (for async/dependency handlers viarun_coroutine_threadsafe). Both supportresult(timeout=X).- Timeout inconsistency: The event_bus docstring documents 5.0 seconds, but test code uses 2.0 seconds—the rationale for choosing 5.0 is unclear.
- Performance impact: In high-throughput scenarios, 5-second blocks per agent execution could accumulate.
Add a comment explaining why blocking is necessary here (to ensure logs are captured before proceeding), and consider whether the timeout should be:
- Configurable via agent or crew settings
- Reduced to match the 2.0-second timeout used in tests
- Documented in the method docstring
The error handling (logging without propagation) is appropriate for non-critical logging.
lib/crewai/tests/agents/test_agent.py (1)
1499-1511: Stop‑word expectations now match explicitstopconfigurationAsserting
set(agent.llm.stop) == set(stop_words)plus the containment check correctly verifies that the LLMstoptokens are exactly the ones supplied (and no implicit extras like “Observation:” are added). The secondall(...)assert is a bit redundant given the set equality but harmless.You could simplify slightly by dropping the
all(...)check, since the set comparison already guarantees membership.lib/crewai/src/crewai/flow/flow.py (2)
1592-1616: Exception handler creates persistence without checkingsuppress_flow_events.When a
HumanFeedbackPendingexception occurs, persistence is auto-created regardless ofsuppress_flow_events. This could have unintended side effects for internal flows that suppress events but don't want persistence.Consider checking
suppress_flow_eventsbefore auto-creating persistence, or making this behavior configurable.
2157-2184: Duplicate tracing disabled message logic.The
_show_tracing_disabled_messagemethod has identical message content in both branches of theif has_user_declined_tracing()conditional. This appears to be copy-paste that should have different messages.🔎 Proposed fix to differentiate messages
if has_user_declined_tracing(): message = """Info: Tracing is disabled. To enable tracing, do any one of these: • Set tracing=True in your Flow code • Set CREWAI_TRACING_ENABLED=true in your project's .env file • Run: crewai traces enable""" else: - message = """Info: Tracing is disabled. + message = """Info: Tracing has not been configured. To enable tracing, do any one of these: • Set tracing=True in your Flow code • Set CREWAI_TRACING_ENABLED=true in your project's .env file • Run: crewai traces enable"""lib/crewai/src/crewai/events/utils/console_formatter.py (1)
38-67: Duplicate tracing disabled messages in_show_tracing_disabled_message_if_needed.Both branches of the
if has_user_declined_tracing()conditional have identical message content. This is the same issue as inflow.py. The messages should differ to provide meaningful context.
♻️ Duplicate comments (3)
lib/crewai/src/crewai/crew.py (3)
206-209: Use explicit type annotation for better type safety.The field
workflow_token_metricsis typed asAny | None, which reduces type safety. Consider usingWorkflowTokenMetrics | Nonefor better IDE support and type checking.🔎 Proposed fix
Add the import at the top of the file:
from crewai.types.usage_metrics import UsageMetrics, WorkflowTokenMetricsThen update the field:
- workflow_token_metrics: Any | None = Field( + workflow_token_metrics: WorkflowTokenMetrics | None = Field( default=None, description="Detailed per-agent and per-task token metrics.", )Based on learnings, this was flagged in previous reviews.
1441-1441: Consider direct attribute access.Since
workflow_token_metricsis a defined class field with a default value ofNone, you can access it directly asself.workflow_token_metricsinstead of usinggetattr.🔎 Proposed fix
- token_metrics=getattr(self, 'workflow_token_metrics', None), + token_metrics=self.workflow_token_metrics,Based on learnings, this was flagged in previous reviews.
1748-1796: Potential double-counting of manager agent tokens.The manager agent token collection has two separate code paths (lines 1752-1766 and 1768-1796) that both add to
total_usage_metricsandworkflow_metrics.per_agent. If both_token_processandllm.get_token_usage_summary()return data, tokens may be counted twice.🔎 Proposed fix
Use the existing
_get_agent_token_usagehelper method to avoid duplication:if self.manager_agent: manager_role = getattr(self.manager_agent, 'role', 'Manager Agent') manager_id = str(getattr(self.manager_agent, 'id', '')) - - if hasattr(self.manager_agent, "_token_process"): - token_sum = self.manager_agent._token_process.get_summary() - total_usage_metrics.add_usage_metrics(token_sum) - - # Create per-agent metrics for manager - manager_metrics = AgentTokenMetrics(...) - workflow_metrics.per_agent[manager_role] = manager_metrics - - if ( - hasattr(self.manager_agent, "llm") - and hasattr(self.manager_agent.llm, "get_token_usage_summary") - ): - if isinstance(self.manager_agent.llm, BaseLLM): - llm_usage = self.manager_agent.llm.get_token_usage_summary() - else: - llm_usage = self.manager_agent.llm._token_process.get_summary() - total_usage_metrics.add_usage_metrics(llm_usage) - ... + # Get manager token usage (uses existing helper to avoid duplication) + manager_usage = self._get_agent_token_usage(self.manager_agent) + total_usage_metrics.add_usage_metrics(manager_usage) + + manager_metrics = AgentTokenMetrics( + agent_name=manager_role, + agent_id=manager_id, + total_tokens=manager_usage.total_tokens, + prompt_tokens=manager_usage.prompt_tokens, + cached_prompt_tokens=manager_usage.cached_prompt_tokens, + completion_tokens=manager_usage.completion_tokens, + successful_requests=manager_usage.successful_requests + ) + workflow_metrics.per_agent[manager_role] = manager_metricsBased on learnings, this was flagged in previous reviews.
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (33)
README.mddocs/docs.jsondocs/en/concepts/production-architecture.mdxdocs/en/introduction.mdxdocs/ko/concepts/production-architecture.mdxdocs/ko/introduction.mdxdocs/ko/observability/tracing.mdxdocs/pt-BR/concepts/production-architecture.mdxdocs/pt-BR/introduction.mdxdocs/pt-BR/observability/tracing.mdxlib/crewai/src/crewai/agent/core.pylib/crewai/src/crewai/agents/agent_builder/base_agent.pylib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.pylib/crewai/src/crewai/agents/crew_agent_executor.pylib/crewai/src/crewai/crew.pylib/crewai/src/crewai/events/event_listener.pylib/crewai/src/crewai/events/listeners/tracing/trace_listener.pylib/crewai/src/crewai/events/utils/console_formatter.pylib/crewai/src/crewai/experimental/__init__.pylib/crewai/src/crewai/experimental/crew_agent_executor_flow.pylib/crewai/src/crewai/experimental/evaluation/agent_evaluator.pylib/crewai/src/crewai/experimental/evaluation/base_evaluator.pylib/crewai/src/crewai/experimental/evaluation/experiment/runner.pylib/crewai/src/crewai/experimental/evaluation/metrics/goal_metrics.pylib/crewai/src/crewai/experimental/evaluation/metrics/reasoning_metrics.pylib/crewai/src/crewai/experimental/evaluation/metrics/semantic_quality_metrics.pylib/crewai/src/crewai/experimental/evaluation/metrics/tools_metrics.pylib/crewai/src/crewai/flow/flow.pylib/crewai/src/crewai/tools/tool_usage.pylib/crewai/tests/agents/test_agent.pylib/crewai/tests/agents/test_crew_agent_executor_flow.pylib/crewai/tests/test_flow_human_input_integration.pylib/crewai/tests/utilities/test_console_formatter_pause_resume.py
💤 Files with no reviewable changes (1)
- lib/crewai/src/crewai/agents/agent_builder/base_agent.py
🧰 Additional context used
🧬 Code graph analysis (11)
lib/crewai/src/crewai/experimental/__init__.py (1)
lib/crewai/src/crewai/experimental/crew_agent_executor_flow.py (1)
CrewAgentExecutorFlow(76-808)
lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py (1)
lib/crewai/src/crewai/events/utils/console_formatter.py (2)
handle_memory_save_started(947-957)handle_memory_retrieval_started(893-903)
lib/crewai/tests/test_flow_human_input_integration.py (4)
lib/crewai/src/crewai/events/utils/console_formatter.py (2)
pause_live_updates(101-109)resume_live_updates(111-117)lib/crewai/tests/agents/test_async_agent_executor.py (1)
executor(58-78)lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py (1)
_ask_human_input(139-186)lib/crewai/src/crewai/experimental/evaluation/base_evaluator.py (1)
title(28-29)
lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py (2)
lib/crewai/src/crewai/agents/parser.py (1)
AgentFinish(39-44)lib/crewai/src/crewai/events/utils/console_formatter.py (2)
pause_live_updates(101-109)resume_live_updates(111-117)
lib/crewai/src/crewai/agents/crew_agent_executor.py (2)
lib/crewai/src/crewai/utilities/i18n.py (2)
I18N(12-111)get_i18n(115-127)lib/crewai/src/crewai/events/event_bus.py (1)
emit(303-364)
lib/crewai/tests/utilities/test_console_formatter_pause_resume.py (1)
lib/crewai/src/crewai/events/utils/console_formatter.py (4)
ConsoleFormatter(10-1419)pause_live_updates(101-109)resume_live_updates(111-117)handle_llm_stream_chunk(408-460)
lib/crewai/src/crewai/experimental/crew_agent_executor_flow.py (3)
lib/crewai/src/crewai/agents/parser.py (3)
AgentAction(28-35)AgentFinish(39-44)OutputParserError(47-61)lib/crewai/src/crewai/flow/flow.py (7)
Flow(482-2184)listen(186-249)or_(326-359)router(252-323)start(111-183)state(972-973)kickoff(1176-1221)lib/crewai/src/crewai/utilities/agent_utils.py (3)
enforce_rpm_limit(220-229)format_message_for_llm(184-198)get_llm_response(232-284)
lib/crewai/src/crewai/events/event_listener.py (1)
lib/crewai/src/crewai/events/utils/console_formatter.py (13)
handle_crew_started(175-190)handle_crew_status(132-173)handle_task_started(192-206)handle_task_status(208-230)handle_flow_created(232-237)handle_flow_started(239-248)handle_flow_status(250-276)handle_method_status(278-305)handle_llm_call_failed(396-406)handle_knowledge_retrieval_started(637-649)handle_knowledge_query_failed(692-703)handle_knowledge_search_query_failed(715-726)handle_reasoning_started(730-748)
lib/crewai/src/crewai/experimental/evaluation/agent_evaluator.py (1)
lib/crewai/src/crewai/agent/core.py (1)
Agent(102-1655)
lib/crewai/tests/agents/test_crew_agent_executor_flow.py (3)
lib/crewai/src/crewai/experimental/crew_agent_executor_flow.py (11)
AgentReActState(62-73)state(210-216)iterations(224-226)messages(219-221)use_stop_words(201-207)_format_prompt(671-683)_is_training_mode(701-707)_append_message_to_state(570-579)_invoke_step_callback(559-568)recover_from_parser_error(419-434)invoke(452-526)lib/crewai/src/crewai/agents/parser.py (3)
AgentAction(28-35)AgentFinish(39-44)OutputParserError(47-61)lib/crewai/src/crewai/agents/crew_agent_executor.py (5)
use_stop_words(161-167)_format_prompt(615-627)_is_training_mode(645-651)_invoke_step_callback(501-510)invoke(169-213)
lib/crewai/src/crewai/experimental/evaluation/base_evaluator.py (1)
lib/crewai/src/crewai/agent/core.py (1)
Agent(102-1655)
🪛 LanguageTool
docs/pt-BR/observability/tracing.mdx
[typographical] ~20-~20: Símbolo sem par: “]” aparentemente está ausente
Context: ... Cadastre-se para uma conta gratuita em app.crewai.com ...
(UNPAIRED_BRACKETS)
[style] ~193-~193: Para conferir mais clareza ao seu texto, busque usar uma linguagem mais concisa.
Context: ...to - Cronograma de Execução: Clique através de diferentes estágios de execução - **Log...
(ATRAVES_DE_POR_VIA)
[typographical] ~204-~204: Símbolo sem par: “]” aparentemente está ausente
Context: ...om a internet 3. Verifique sua conta em app.crewai.com ...
(UNPAIRED_BRACKETS)
docs/ko/introduction.mdx
[style] ~57-~57: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ...low**가 상태를 관리하고 다음에 무엇을 할지 결정합니다. 3. Flow가 복잡한 작업을 Crew에게 위임합니다. 4. **Crew...
(ENGLISH_WORD_REPEAT_BEGINNING_RULE)
docs/en/introduction.mdx
[style] ~85-~85: ‘overall structure’ might be wordy. Consider a shorter alternative.
Context: ...Flow**. - Use a Flow to define the overall structure, state, and logic of your application. ...
(EN_WORDINESS_PREMIUM_OVERALL_STRUCTURE)
docs/pt-BR/introduction.mdx
[typographical] ~14-~14: Símbolo sem par: “[” aparentemente está ausente
Context: ...iso dos Flows. - Flows do CrewAI: A es...
(UNPAIRED_BRACKETS)
[typographical] ~15-~15: Símbolo sem par: “[” aparentemente está ausente
Context: ... de IA trabalharem. - Crews do CrewAI: As u...
(UNPAIRED_BRACKETS)
[style] ~34-~34: Para conferir mais clareza ao seu texto, busque usar uma linguagem mais concisa.
Context: ...enciamento de Estado**: Persistem dados através de etapas e execuções. - **Execução Orient...
(ATRAVES_DE_POR_VIA)
[style] ~57-~57: Três frases seguidas começam com a mesma palavra. Considere reformular a frase ou use um dicionário para encontrar um sinônimo.
Context: ...do e decide o que fazer a seguir. 3. O Flow delega uma tarefa complexa para ...
(PORTUGUESE_WORD_REPEAT_BEGINNING_RULE)
[style] ~85-~85: “estrutura geral” é um pleonasmo. É preferível dizer “estrutura”
Context: ...ow**. - Use um Flow para definir a estrutura geral, estado e lógica da sua aplicação. - **...
(PT_REDUNDANCY_REPLACE_ESTRUTURA_GERAL)
[style] ~86-~86: “dentro de uma” é uma expressão prolixa. É preferível dizer “numa” ou “em uma”.
Context: ...ica da sua aplicação. - Use um Crew dentro de uma etapa do Flow quando precisar de uma eq...
(PT_WORDINESS_REPLACE_DENTRO_DE_UMA)
🔇 Additional comments (61)
lib/crewai/src/crewai/experimental/evaluation/metrics/goal_metrics.py (1)
1-18: LGTM! Clean typing refactoring to avoid circular imports.The changes correctly implement the TYPE_CHECKING guard pattern:
from __future__ import annotationsenables postponed evaluation, converting type hints to strings at runtime- Moving the
Agentimport behindTYPE_CHECKINGeliminates the runtime dependency while preserving type-checker validation- Runtime behavior is unchanged since
agent.roleandagent.goalaccess actual instance attributes, not theAgentclass itselfThis is a standard Python best practice for breaking circular import dependencies without sacrificing type safety.
lib/crewai/src/crewai/experimental/evaluation/metrics/reasoning_metrics.py (2)
9-9: LGTM! Excellent use of TYPE_CHECKING for forward references.The addition of
from __future__ import annotationsand the TYPE_CHECKING guard for the Agent import follows Python typing best practices. This approach avoids circular imports and reduces runtime overhead while preserving type hints for theevaluatemethod signature at line 50.Also applies to: 15-15, 31-33
38-40: Critical fix: Adds missing enum members that were already referenced.These enum additions fix a critical bug where the code at lines 311, 314, and 319 was already referencing
VERBOSE,INDECISIVE, andSCATTEREDbefore they were defined, which would have causedAttributeErrorat runtime when_analyze_reasoning_patternsexecuted those code paths.lib/crewai/src/crewai/experimental/evaluation/metrics/semantic_quality_metrics.py (3)
1-1: LGTM! Proper use of postponed annotation evaluation.The addition of
from __future__ import annotationsenables type hints to be evaluated as strings at runtime, which is essential for the TYPE_CHECKING pattern below and avoids circular import issues.
3-3: LGTM! Correct typing imports.The addition of
TYPE_CHECKINGenables the conditional import pattern used below. Both imports are standard and correctly applied.
16-18: LGTM! Idiomatic circular import resolution.The TYPE_CHECKING guard correctly defers the Agent import to type-checking time only. Combined with
from __future__ import annotations, this prevents circular imports while preserving full type safety. The runtime code only accesses attributes (e.g.,agent.roleat line 58) that should be available on bothAgentandBaseAgenttypes.lib/crewai/src/crewai/experimental/evaluation/base_evaluator.py (3)
1-18: LGTM! Proper typing safeguards implemented.The addition of postponed annotation evaluation and TYPE_CHECKING guards correctly prevents runtime circular imports while maintaining type safety. This pattern is consistent with modern Python typing best practices.
20-29: LGTM! Metric categories expanded appropriately.The new metric categories (REASONING_EFFICIENCY, TOOL_SELECTION, PARAMETER_EXTRACTION, TOOL_INVOCATION) align well with the PR's token tracing objectives. The
title()helper method correctly converts enum values to human-readable format.
62-69: LGTM! Type annotations correctly leverage TYPE_CHECKING.The
Agent | BaseAgenttype hint properly uses the TYPE_CHECKING import, ensuring type safety without runtime overhead.lib/crewai/src/crewai/experimental/evaluation/agent_evaluator.py (1)
1-34: LGTM! Consistent typing pattern applied.The future annotations and TYPE_CHECKING guard follow the same approach as
base_evaluator.py, ensuring consistent import management across the evaluation module.lib/crewai/src/crewai/experimental/evaluation/experiment/runner.py (1)
1-2: LGTM: Enables postponed annotation evaluation.The addition of
from __future__ import annotationsis a best practice that prevents circular import issues and enables the TYPE_CHECKING pattern used in this file.lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py (4)
6-6: LGTM!The import is necessary for the type annotations added to the memory methods and improves type safety.
33-33: LGTM!Adding explicit type annotations for the
outputparameter improves type safety and code maintainability. TheAgentFinishtype correctly matches how these methods use theoutput.textattribute.Also applies to: 57-57, 79-79
151-186: LGTM! Clean resource management with proper try-finally.The refactoring successfully replaces text-based prompts with Rich panel rendering, providing a more polished user experience. The try-finally block ensures that
resume_live_updates()is called even if an exception occurs during user input, which is excellent resource management.The conditional logic for training vs. regular feedback modes is clear and well-structured, with appropriate prompt text and styling for each mode.
148-149: Remove this check -event_listener.formatteris always available.The
event_listeneris a module-level singleton that's instantiated once at import time. Its__init__method unconditionally initializesself.formatter = ConsoleFormatter(verbose=True)during the first instantiation, guaranteeing the attribute always exists when accessed. NoAttributeErrorcan occur here.Likely an incorrect or invalid review comment.
docs/pt-BR/observability/tracing.mdx (1)
20-20: Static analysis false positive - no action needed.The static analysis tool flagged an "unpaired bracket" here, but the markdown link syntax
[app.crewai.com](https://app.crewai.com)is correct. This is a false positive from LanguageTool not recognizing markdown syntax.lib/crewai/src/crewai/agents/crew_agent_executor.py (2)
10-10: LGTM! Logging support added appropriately.The addition of Python's standard logging module and a module-level logger follows best practices for structured logging in production code.
Also applies to: 55-55
97-97: LGTM! Good use of dependency injection for i18n.The addition of an optional
i18nparameter with a fallback toget_i18n()is an excellent pattern. This:
- Maintains backward compatibility (existing code doesn't need changes)
- Enables dependency injection for testing
- Follows the principle of explicit over implicit dependencies
Also applies to: 121-121
docs/ko/concepts/production-architecture.mdx (1)
1-154: LGTM! Well-structured production architecture documentation.The Korean production architecture documentation is comprehensive and well-organized. It clearly explains the Flow-first mindset with practical code examples, architectural diagrams, and deployment guidance. All internal documentation links (lines 18, 87, 116, 131) point to existing localized Korean pages and are valid.
docs/pt-BR/concepts/production-architecture.mdx (1)
1-154: LGTM! Well-structured production architecture documentation.The Portuguese-BR production architecture documentation is comprehensive and mirrors the structure of the Korean version effectively. The content clearly explains the Flow-first approach with practical examples and deployment patterns. All referenced internal documentation links (observability/tracing, concepts/tasks, learn/llm-hooks, and enterprise/guides/deploy-crew) are in place.
docs/en/introduction.mdx (4)
10-18: Flow‑first framing and links look consistentThe updated intro, Flows/Crews descriptions, and new links (
/en/guides/flows/first-flow,/en/guides/crews/first-crew) are coherent and match the architecture story you’re pushing; nothing blocking here.Also applies to: 21-37, 38-52
53-61: “How it all works together” sequence is logically soundThe Flow→Crew→Flow handoff steps read cleanly and map well to the mental model; no changes needed.
62-76: Key Features section aligns with product positioningThe four cards (Flows, Crews, Tools, Security) are clear and technically accurate for the framework capabilities.
113-119: Updated “Build Your First Crew” card is consistentCard copy and link to
/en/guides/crews/first-crewalign with the rest of the page and the new Flow‑first narrative.lib/crewai/src/crewai/experimental/__init__.py (1)
1-1: Re‑exportingCrewAgentExecutorFlowfromcrewai.experimentalis appropriateImporting
CrewAgentExecutorFlowand adding it to__all__matches the pattern used for other experimental exports and gives a clean public surface undercrewai.experimental.Also applies to: 23-28
lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py (2)
4-5: Singleton typing viacast(Self, ...)is correctUsing
Selfpluscast(Self, cls._instance)in__new__tightens typing without changing runtime behavior of the singleton; this is safe.Also applies to: 100-109
333-342: Formatter calls updated to new memory panel APIBoth
self.formatter.handle_memory_save_started()andself.formatter.handle_memory_retrieval_started()calls correctly match the updatedConsoleFormattersignatures (no parameters), and the in‑progress flags logic is intact.lib/crewai/tests/agents/test_agent.py (3)
1167-1182: Explicit executor creation improves prompt‑template test stabilityCalling
agent.create_agent_executor()before exercising_format_promptensures the executor is initialized with the custom system/prompt/response templates, making the test more robust against lazy initialization changes.
1625-1635: Agent executor initialization for context‑length test aligns with new flowCreating the executor via
agent.create_agent_executor(task=task)before patchingCrewAgentExecutor.invokeensures the test exercises the context‑length logic with a fully initialized executor. Just keep in mind this uses the firsttaskinstance; ifexecute_taskinternally swaps to a different Task object, the patched wrapper will still be bound to the initial configuration.Same verification script as above for
execute_task/create_agent_executorapplies here.
1432-1447: Test setup is correct and deterministic as written
Agent.execute_taskdoes not callcreate_agent_executoror reassignagent_executor. It simply checks that the executor exists and invokes it directly. Initializing the executor before patchingCrewAgentExecutor.invokeensures the patch wraps the correct instance that will be used during execution.docs/docs.json (1)
113-132: All referenced documentation files exist; no 404 risksThe additions for
production-architecture,human-feedback-in-flows, andobservability/tracingacrossen,pt-BR, andkoall have corresponding MDX files and are properly indexed in docs.json. No missing documentation paths detected.lib/crewai/src/crewai/tools/tool_usage.py (1)
244-253: Addingrun_attemptstoToolUsageStartedEventis correct and consistentThe field is properly defined in the parent
ToolUsageEventclass asrun_attempts: int | None = None, so both the sync (lines 244–253) and async (lines 431–440) code paths correctly pass it when emittingToolUsageStartedEvent(**event_data). This aligns with how the field appears elsewhere in finished and error events.lib/crewai/src/crewai/experimental/evaluation/metrics/tools_metrics.py (1)
1-5: TYPE_CHECKING import forAgentis correctly implementedUsing
from __future__ import annotationscombined with a TYPE_CHECKING‑guarded import ofAgentproperly defers the import to type-checking time only, avoiding runtime evaluation while preserving type hints. The pattern is sound and the union typeAgent | BaseAgentresolves correctly with deferred annotations.README.md (1)
169-169: LGTM: Consistent migration to UV package manager.All
pipcommands have been consistently updated touv pipacross installation instructions, troubleshooting, and FAQ sections. This aligns with the adoption of UV as the dependency management tool mentioned at line 164.Also applies to: 175-175, 188-196, 614-614, 690-690, 696-696
lib/crewai/tests/utilities/test_console_formatter_pause_resume.py (1)
7-107: LGTM: Comprehensive test coverage for pause/resume functionality.The test suite thoroughly covers HITL pause/resume behavior including:
- Active session pause/stop
- Safety when no session exists
- Multiple consecutive pauses
- On-demand Live session creation
- Full pause/resume cycles
Test structure follows best practices with clear naming, proper mocking, and appropriate assertions.
lib/crewai/tests/test_flow_human_input_integration.py (1)
10-164: LGTM: Well-refactored HITL integration tests.The tests have been properly updated to:
- Use the public
pause_live_updates/resume_live_updatesAPI instead of internal state manipulation- Verify pause/resume behavior through proper mocking
- Cover edge cases including no session, exceptions, and training mode
- Validate panel-based output for training feedback
Test structure follows best practices with clear test names, proper isolation through mocking, and comprehensive assertions.
docs/en/concepts/production-architecture.mdx (2)
131-131: The deployment guide link is valid. The filedocs/en/enterprise/guides/deploy-crew.mdxexists in the repository.
141-147: The@persistdecorator is fully implemented and properly exported from the codebase. The concern is unfounded.The decorator exists in
lib/crewai/src/crewai/flow/persistence/decorators.pyand is exported fromlib/crewai/src/crewai/flow/__init__.py. It supports both class-level usage (as shown in the documentation snippet) and method-level usage, with optional parameters for specifying a persistence backend and enabling verbose logging. Extensive test coverage intest_flow_persistence.pydemonstrates that the decorator works as documented.Likely an incorrect or invalid review comment.
lib/crewai/src/crewai/agent/core.py (1)
225-228: Backward compatibility is maintained with the new executor_class field.The new
executor_classfield with a default value ofCrewAgentExecutorpreserves backward compatibility. Existing Agent instantiations throughout the test suite do not specify this field and continue to use the default executor, confirming that no breaking changes are introduced. The field is properly implemented and used at line 743 to instantiate the executor.lib/crewai/tests/agents/test_crew_agent_executor_flow.py (1)
1-479: LGTM! Comprehensive test coverage for CrewAgentExecutorFlow.The test suite provides thorough coverage of:
- State initialization and defaults
- Flow lifecycle methods (initialization, iteration checks, routing, finalization)
- Error recovery paths (parser and context errors)
- Both synchronous and asynchronous execution scenarios
- Backward compatibility with the invoke method
The heavy use of mocks is appropriate for unit testing flow control logic without requiring actual LLM calls.
lib/crewai/src/crewai/events/event_listener.py (2)
189-196: LGTM! Clean helper function for task name extraction.The
get_task_namehelper properly handles the fallback chain:source.name→source.description→None. This consolidates duplicate logic used across multiple event handlers.
315-330: LGTM! New pause event handlers properly integrated.The handlers for
FlowPausedEventandMethodExecutionPausedEventcorrectly delegate to the formatter'shandle_flow_statusandhandle_method_statusmethods with "paused" status, maintaining consistency with other event handlers.lib/crewai/src/crewai/crew.py (2)
2093-2131: LGTM! Token metrics helpers properly track per-task usage.The
_get_agent_token_usageand_attach_task_token_metricshelpers correctly:
- Handle None agents gracefully
- Calculate token deltas (tokens_after - tokens_before) for each task
- Use
task_idin the composite key (line 2128) to prevent collisions for tasks with identical namesThe implementation addresses the previous concern about task key collisions by including the unique
task_idin the key.
950-964: The agent reference validity concern is not an issue; however, update the type hint at line 932.The agent object and token state are both valid throughout async execution. The
exec_data.agentreference remains valid as a Python object reference, and_get_agent_token_usage()queries the current LLM token state dynamically at each invocation (line 2085), not from a cached value. The token delta calculation correctly captures snapshots before and after async task execution.However, the type annotation at line 932 is inconsistent: it declares a 3-element tuple but the code appends 5-element tuples at line 964. Update the annotation to match the corrected signatures at lines 1001 and 1016:
pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int, Any, Any]] = []Likely an incorrect or invalid review comment.
docs/pt-BR/introduction.mdx (1)
1-144: LGTM! Documentation aligns with Flow-first architecture.The Portuguese documentation updates properly:
- Emphasize Flow as the backbone and Crews as intelligent worker units
- Provide clear architectural guidance with visual aids
- Include practical use case examples
- Maintain consistency with the broader PR's architectural narrative
The static analysis warnings about unpaired brackets are false positives for Markdown link syntax and can be ignored.
lib/crewai/src/crewai/flow/flow.py (6)
690-701: Potential deadlock when callingresume()from sync context.The
resume()method usesasyncio.run()which creates a new event loop. If called from a context where nested event loops are problematic (e.g., Jupyter notebooks), this will fail. Consider usingnest_asyncioor documenting this limitation more prominently.Additionally, the check for a running loop followed by
asyncio.run()is correct, but the error message could be more helpful by suggesting alternatives.
806-842: Returning exception object instead of raising may cause confusion.Returning
HumanFeedbackPendingas a value rather than raising it is unconventional. Callers must checkisinstance(result, HumanFeedbackPending)instead of using try/except. This pattern is documented but could be surprising.Consider documenting this clearly in the return type annotation or providing a helper method to check if the flow is paused.
872-886: State ID generation may overwrite valid empty string IDs.The logic
if not getattr(instance, "id", None)will treat an empty string""as falsy and generate a new UUID. If an empty string ID is intentionally set, it will be overwritten. This is likely the desired behavior but worth noting.
1336-1396: Exception handling catchesHumanFeedbackPendingand returns it as a value.This exception-as-return-value pattern is used consistently throughout the file, which is good for internal consistency. However, ensure all callers of
kickoff_asyncare aware they may receive an exception object as a return value.
1920-1991:_request_human_feedbackusesinput()which blocks the event loop.Using Python's built-in
input()in an async context can cause issues. While the method itself is synchronous, if called from within an async flow execution, it will block the entire event loop.This is acceptable for the console-based human-in-the-loop use case, but consider documenting that this method should not be used in async web server contexts.
2033-2041: DynamicLiteraltype creation may cause issues with some type checkers.Creating a Pydantic model with
Literal[outcomes_tuple]dynamically works at runtime but may cause issues with static type checkers. The# type: ignore[valid-type]comment acknowledges this.lib/crewai/src/crewai/experimental/crew_agent_executor_flow.py (5)
184-198: Deferred Flow initialization is a clever pattern but may cause issues.The
_ensure_flow_initializedpattern deferssuper().__init__()untilinvoke()is called. This means:
- Flow metaclass attributes are set on the class, not instance
- Instance attributes from
Flow.__init__are not available until deferred initThis works because the metaclass (
FlowMeta) sets class-level attributes, but it's fragile. IfFlow.__init__is modified to set critical instance attributes, this could break.
234-249:force_final_answermodifies state but state may not be Flow-managed yet.This method is decorated with
@listen("force_final_answer"), meaning it's part of the flow. However, it directly modifiesself.state.current_answerandself.state.is_finished.Since
AgentReActStateis a Pydantic model with default mutable fields, these modifications work, but the pattern is inconsistent with Flow's typical state management.
452-526:invokemethod has comprehensive error handling but lacks cleanup on some paths.The
try/finallyensures_is_executingis reset, which is good. However, ifkickoff()raises an exception other thanAssertionError, the exception is re-raised afterhandle_unknown_error, but memory creation methods (_create_short_term_memory, etc.) are skipped.This is likely intentional (don't save memory for failed executions), but consider whether partial results should be saved.
728-744: Training feedback re-runs entire flow viakickoff().After appending feedback to messages, the code resets state and calls
kickoff()again. This creates a recursive-like pattern where the flow is executed multiple times within a singleinvoke()call.Ensure this doesn't cause issues with:
- Event emission (events may fire multiple times)
- Memory management (short/long term memory created multiple times)
- Execution guards (though
_is_executingremainsTruethroughout)
799-808:__get_pydantic_core_schema__returnsany_schema()which bypasses validation.Using
core_schema.any_schema()means Pydantic won't validate this type at all. This is a workaround for Protocol compatibility, but it means the executor can be assigned any value in Pydantic models without validation.This is acceptable for the stated use case but document the implications.
lib/crewai/src/crewai/events/utils/console_formatter.py (5)
101-117:pause_live_updatesandresume_live_updateshave asymmetric behavior.
pause_live_updatesstops and clears_streaming_live, butresume_live_updatesis a no-op. The docstring says "New streaming sessions will be created on-demand" but there's no mechanism to recreate them after pausing.If human input occurs during streaming, the streaming panel won't automatically resume. This may be acceptable behavior but should be documented.
250-276:handle_flow_statussupports "paused" status.Good addition for the human-in-the-loop feature. The panel styling is consistent with other status handlers.
278-305:handle_method_statusdoesn't checkis_flowflag consistently.This method uses
self.print_panel(..., is_flow=True)but other flow-related handlers likehandle_flow_createdalso setis_flow=True. The difference is thathandle_method_statusalso checksself.verboseat the start.This inconsistency means flow events always print (when
is_flow=True), but method status requires verbose mode. This might be intentional to reduce noise, but verify this is the desired behavior.
1171-1216:handle_a2a_response_receivedhas comprehensive status styling.Good coverage of different A2A status types with appropriate visual indicators. The canceled status has empty style which will use default styling.
175-190:handle_crew_startedresetscrew_completion_printedshared state across all crew instances.The
crew_completion_printedClassVar is shared across all ConsoleFormatter instances and reset on every crew start. With the event bus using aThreadPoolExecutorwith max_workers=10, multiple crews can be kicked off concurrently. If crews A and B start near-simultaneously on different threads, crew A's call toclear()will reset the state that crew B may be relying on, causing race conditions in tracking crew completion output.
| **The short answer: Use both.** | ||
|
|
||
| | Use Case | Recommended Approach | Why? | | ||
| |:---------|:---------------------|:-----| | ||
| | **Open-ended research** | [Crews](/en/guides/crews/first-crew) | When tasks require creative thinking, exploration, and adaptation | | ||
| | **Content generation** | [Crews](/en/guides/crews/first-crew) | For collaborative creation of articles, reports, or marketing materials | | ||
| | **Decision workflows** | [Flows](/en/guides/flows/first-flow) | When you need predictable, auditable decision paths with precise control | | ||
| | **API orchestration** | [Flows](/en/guides/flows/first-flow) | For reliable integration with multiple external services in a specific sequence | | ||
| | **Hybrid applications** | Combined approach | Use [Flows](/en/guides/flows/first-flow) to orchestrate overall process with [Crews](/en/guides/crews/first-crew) handling complex subtasks | | ||
| For any production-ready application, **start with a Flow**. | ||
|
|
||
| ### Decision Framework | ||
| - **Use a Flow** to define the overall structure, state, and logic of your application. | ||
| - **Use a Crew** within a Flow step when you need a team of agents to perform a specific, complex task that requires autonomy. | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Minor wording nit: “overall structure”
The guidance to “Use a Flow to define the overall structure, state, and logic” is fine, but “overall structure” is slightly wordy; consider just “structure, state, and logic” or “structure and logic” to tighten it.
Also applies to: 88-93
🧰 Tools
🪛 LanguageTool
[style] ~85-~85: ‘overall structure’ might be wordy. Consider a shorter alternative.
Context: ...Flow**. - Use a Flow to define the overall structure, state, and logic of your application. ...
(EN_WORDINESS_PREMIUM_OVERALL_STRUCTURE)
🤖 Prompt for AI Agents
In docs/en/introduction.mdx around lines 81-87 (and also apply the same change
to lines 88-93), the phrase “define the overall structure, state, and logic” is
wordy; change it to a tighter wording such as “define the structure, state, and
logic” or “define the structure and logic” in both locations so the sentence
reads more concise and consistent.
| 1. **Flow**가 이벤트를 트리거하거나 프로세스를 시작합니다. | ||
| 2. **Flow**가 상태를 관리하고 다음에 무엇을 할지 결정합니다. | ||
| 3. **Flow**가 복잡한 작업을 **Crew**에게 위임합니다. | ||
| 4. **Crew**의 agent들이 협력하여 작업을 완료합니다. | ||
| 5. **Crew**가 결과를 **Flow**에 반환합니다. | ||
| 6. **Flow**가 결과를 바탕으로 실행을 계속합니다. | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider rephrasing for better readability.
The static analysis tool flagged that lines 55-57 have three consecutive sentences starting with "Flow가", which affects readability. This is a minor style issue in Korean documentation.
Suggested improvement
Consider varying the sentence structure, for example:
- Combine sentences where possible
- Use pronouns or different subjects
- Restructure to improve flow
🧰 Tools
🪛 LanguageTool
[style] ~57-~57: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ...low**가 상태를 관리하고 다음에 무엇을 할지 결정합니다. 3. Flow가 복잡한 작업을 Crew에게 위임합니다. 4. **Crew...
(ENGLISH_WORD_REPEAT_BEGINNING_RULE)
🤖 Prompt for AI Agents
In docs/ko/introduction.mdx around lines 55 to 61, three consecutive sentences
start with "**Flow**가", hurting readability; rewrite those lines to vary
sentence openings by combining related sentences, replacing repeated subjects
with pronouns or alternate phrasing, and/or changing one sentence to passive or
different subject (e.g., combine 1 and 2 into a single sentence, use "이 Flow는"
or "그것이" for subsequent mentions, and rephrase one sentence to start with "복잡한
작업은" or "Crew가"). Ensure the meaning stays the same and the list remains clear
and natural in Korean.
| self.state.message = "Flow completed" | ||
|
|
||
| # Create and run the flow with tracing enabled | ||
| flow = ExampleFlow(tracing=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential redundancy in Flow instantiation.
Line 146 instantiates ExampleFlow(tracing=True), but the __init__ method on line 123 already sets tracing=True in the super().__init__() call. This creates redundancy where tracing is enabled twice.
Either:
- Remove
tracing=Truefrom line 146 if the class-level setting in__init__is sufficient, OR - Remove
tracing=Truefrom line 123 if the intention is to pass it at instantiation time.
The same pattern appears in the pt-BR documentation (docs/pt-BR/observability/tracing.mdx line 146), suggesting this may be an intentional demonstration of both approaches, but it could confuse readers about which method to use.
🔎 Suggested fix: Choose one approach
Option 1: Remove from instantiation (line 146)
-flow = ExampleFlow(tracing=True)
+flow = ExampleFlow()Option 2: Remove from init (line 123)
- def __init__(self):
- super().__init__(tracing=True) # Enable tracing for the flow
+ def __init__(self, tracing: bool = True):
+ super().__init__(tracing=tracing)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| flow = ExampleFlow(tracing=True) | |
| flow = ExampleFlow() |
🤖 Prompt for AI Agents
In docs/ko/observability/tracing.mdx around line 146, the ExampleFlow is
instantiated with tracing=True which is redundant because the class __init__ on
line 123 already calls super().__init__(tracing=True); remove tracing=True from
the instantiation on line 146 to avoid confusion, and apply the same change to
the mirrored file docs/pt-BR/observability/tracing.mdx at its corresponding line
146 so both locales consistently demonstrate the class-level setting.
| 1. **O Flow** aciona um evento ou inicia um processo. | ||
| 2. **O Flow** gerencia o estado e decide o que fazer a seguir. | ||
| 3. **O Flow** delega uma tarefa complexa para um **Crew**. | ||
| 4. Os agentes do **Crew** colaboram para completar a tarefa. | ||
| 5. **O Crew** retorna o resultado para o **Flow**. | ||
| 6. **O Flow** continua a execução com base no resultado. | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider rephrasing for better readability (optional).
Similar to the Korean documentation, lines 55-57 have three consecutive sentences starting with "O Flow", which affects readability in Portuguese. This is a minor style consideration.
Suggested improvement
Consider varying the sentence structure:
- Combine related actions
- Use pronouns or alternative subjects
- Restructure to improve flow
🧰 Tools
🪛 LanguageTool
[style] ~57-~57: Três frases seguidas começam com a mesma palavra. Considere reformular a frase ou use um dicionário para encontrar um sinônimo.
Context: ...do e decide o que fazer a seguir. 3. O Flow delega uma tarefa complexa para ...
(PORTUGUESE_WORD_REPEAT_BEGINNING_RULE)
🤖 Prompt for AI Agents
In docs/pt-BR/introduction.mdx around lines 55 to 61, the three consecutive
sentences that each start with "O Flow" reduce readability; rephrase these lines
by combining related actions into one sentence, using pronouns (por exemplo
"ele") or alternative subjects, or restructuring the sequence so the repetition
is removed while preserving meaning and order (ex.: merge steps 1–2 into a
single sentence about triggering and state management, use "ele" for the
continuation, and keep delegation to Crew as a separate step).
| self.state.message = "Flow completed" | ||
|
|
||
| # Create and run the flow with tracing enabled | ||
| flow = ExampleFlow(tracing=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential redundancy in Flow instantiation.
Line 146 instantiates ExampleFlow(tracing=True), but the __init__ method on line 123 already sets tracing=True in the super().__init__() call. This creates redundancy where tracing is enabled twice.
Consider choosing one approach consistently. This same issue appears in the Korean documentation (docs/ko/observability/tracing.mdx line 146).
🔎 Suggested fix: Choose one approach
Option 1: Remove from instantiation (line 146)
-flow = ExampleFlow(tracing=True)
+flow = ExampleFlow()Option 2: Remove from init (line 123)
- def __init__(self):
- super().__init__(tracing=True) # Enable tracing for the flow
+ def __init__(self, tracing: bool = True):
+ super().__init__(tracing=tracing)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| flow = ExampleFlow(tracing=True) | |
| flow = ExampleFlow() |
🤖 Prompt for AI Agents
docs/pt-BR/observability/tracing.mdx around line 146: ExampleFlow is
instantiated with tracing=True but the class __init__ already sets tracing=True
(line 123), creating redundancy; pick one approach and remove the duplicate —
either delete tracing=True from the instantiation at line 146 or remove the
tracing=True default from the class __init__ at line 123 — and apply the same
change to docs/ko/observability/tracing.mdx line 146 to keep both locales
consistent.
| from crewai.flow.human_feedback import HumanFeedbackResult | ||
| from datetime import datetime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Import inside method - consider module-level import.
The datetime import is placed inside the method. While this works, it's more idiomatic in Python to import at the module level. However, given this is in a conditional execution path, this is acceptable for lazy loading.
🤖 Prompt for AI Agents
In lib/crewai/src/crewai/flow/flow.py around lines 719-720, the "from datetime
import datetime" import is placed inside the method; move that import to the
module-level imports at the top of the file (alongside other imports) and remove
the in-method import so datetime is available without lazy-loading; if the
import was intentionally lazy for startup cost, add a brief comment explaining
that and leave it, otherwise perform the move and run tests/flake8 to ensure no
unused-import warnings.
| if isinstance(self._state, dict): | ||
| # For dict states, preserve existing fields unless overridden | ||
| # For dict states, update with inputs | ||
| # If inputs contains an id, use it (for restoring from persistence) | ||
| # Otherwise preserve the current id or generate a new one | ||
| current_id = self._state.get("id") | ||
| # Only update specified fields | ||
| inputs_has_id = "id" in inputs | ||
|
|
||
| # Update specified fields | ||
| for k, v in inputs.items(): | ||
| self._state[k] = v | ||
| # Ensure ID is preserved or generated | ||
| if current_id: | ||
| self._state["id"] = current_id | ||
| elif "id" not in self._state: | ||
| self._state["id"] = str(uuid4()) | ||
|
|
||
| # Ensure ID is set: prefer inputs id, then current id, then generate | ||
| if not inputs_has_id: | ||
| if current_id: | ||
| self._state["id"] = current_id | ||
| elif "id" not in self._state: | ||
| self._state["id"] = str(uuid4()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Complex ID preservation logic - potential for unexpected behavior.
The logic for preserving IDs during _initialize_state is intricate. When inputs_has_id is False but current_id exists, the current ID is preserved. However, if inputs contains id=None explicitly, it will be set to None before being potentially overwritten.
Consider adding a comment explaining the ID precedence: inputs ID > current ID > generated UUID.
🔎 Suggested clarifying comment
def _initialize_state(self, inputs: dict[str, Any]) -> None:
"""Initialize or update flow state with new inputs.
Args:
inputs: Dictionary of state values to set/update
Raises:
ValueError: If validation fails for structured state
TypeError: If state is neither BaseModel nor dictionary
+
+ Note:
+ ID precedence: inputs["id"] > current state ID > newly generated UUID
"""Committable suggestion skipped: line range outside the PR's diff.
|
|
||
| # If start method is a router, use its result as an additional trigger | ||
| if start_method_name in self._routers and result is not None: | ||
| # Execute listeners for the start method name first | ||
| await self._execute_listeners(start_method_name, result) | ||
| # Then execute listeners for the router result (e.g., "approved") | ||
| router_result_trigger = FlowMethodName(str(result)) | ||
| listeners_for_result = self._find_triggered_methods( | ||
| router_result_trigger, router_only=False | ||
| ) | ||
| if listeners_for_result: | ||
| # Pass the HumanFeedbackResult if available | ||
| listener_result = ( | ||
| self.last_human_feedback | ||
| if self.last_human_feedback is not None | ||
| else result | ||
| ) | ||
| tasks = [ | ||
| self._execute_single_listener(listener_name, listener_result) | ||
| for listener_name in listeners_for_result | ||
| ] | ||
| await asyncio.gather(*tasks) | ||
| else: | ||
| await self._execute_listeners(start_method_name, result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Router-triggered start method handling has duplicated listener execution logic.
The logic for handling routers in _execute_start_method duplicates some patterns from _execute_listeners. This could lead to maintenance burden if the listener execution logic changes.
Consider extracting the common pattern into a helper method.
🤖 Prompt for AI Agents
In lib/crewai/src/crewai/flow/flow.py around lines 1477-1500, the router branch
in _execute_start_method duplicates listener-dispatch logic that exists in
_execute_listeners; extract a small helper (e.g., _dispatch_listeners(trigger,
result, router_only=False)) that (1) uses _find_triggered_methods(trigger,
router_only), (2) resolves listener_result (prefer last_human_feedback when
appropriate), (3) builds tasks with _execute_single_listener and awaits
asyncio.gather; then replace the duplicated code in the router branch and the
else branch to call this helper with the appropriate trigger and router_only
flag so listener execution is centralized.
| except Exception as e: | ||
| # Fallback to simple prompting if structured output fails | ||
| logger.warning( | ||
| f"Structured output failed, falling back to simple prompting: {e}" | ||
| ) | ||
| response = llm_instance.call(messages=prompt) | ||
| response_clean = str(response).strip() | ||
|
|
||
| # Exact match (case-insensitive) | ||
| for outcome in outcomes: | ||
| if outcome.lower() == response_clean.lower(): | ||
| return outcome | ||
|
|
||
| # Partial match | ||
| for outcome in outcomes: | ||
| if outcome.lower() in response_clean.lower(): | ||
| return outcome | ||
|
|
||
| # Fallback to first outcome | ||
| logger.warning( | ||
| f"Could not match LLM response '{response_clean}' to outcomes {list(outcomes)}. " | ||
| f"Falling back to first outcome: {outcomes[0]}" | ||
| ) | ||
| return outcomes[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fallback logic for outcome matching is robust but verbose.
The fallback logic handles multiple scenarios (JSON parsing, exact match, partial match) which is good for resilience. However, the partial match could potentially match unintended outcomes if one outcome is a substring of another (e.g., "approved" matching "not_approved").
Consider using word boundary matching or requiring the outcome to be a distinct word.
🔎 Suggested improvement for partial matching
# Partial match
for outcome in outcomes:
- if outcome.lower() in response_clean.lower():
+ # Match as whole word to avoid substring false positives
+ import re
+ if re.search(rf'\b{re.escape(outcome)}\b', response_clean, re.IGNORECASE):
return outcome🤖 Prompt for AI Agents
In lib/crewai/src/crewai/flow/flow.py around lines 2082 to 2105, the fallback
partial-match logic can accidentally match outcomes that are substrings of
others (e.g., "approved" matching "not_approved"); change the partial-match step
to require whole-word matches using case-insensitive word-boundary checks (or
tokenize the LLM response and compare tokens) rather than simple substring
containment, escaping any special chars in outcome strings for regex use and
keeping the exact-match check first, then the whole-word/regex check, and
finally the fallback to the first outcome.
| assert set(agent.llm.stop) == set(["STOP", "END"]) | ||
| assert all(word in agent.llm.stop for word in ["STOP", "END"]) | ||
| assert agent.llm.max_tokens == 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
All‑attributes test consistent with new stop‑words behavior
The additional assertions on agent.llm.stop being exactly ["STOP", "END"] and containing both entries are consistent with the behavior tested earlier and ensure the full‑attributes path doesn’t regress stop‑word handling.
You can mirror the earlier test by reusing a shared stop_words constant here to avoid duplicating the literal list.
🤖 Prompt for AI Agents
In lib/crewai/tests/agents/test_agent.py around lines 1686 to 1688, the test
duplicates the literal stop-word list when asserting agent.llm.stop; replace the
hard-coded ["STOP", "END"] usage with the shared stop_words constant used
earlier in the test suite so the assertions reuse that constant for both
equality and containment checks, keeping the same assertions otherwise to ensure
consistency and avoid duplication.
Resolved race condition where concurrent async tasks from same agent would get incorrect token attribution. Solution wraps async task execution to capture tokens_after immediately upon task completion, before other concurrent tasks can interfere. Changes: - Wrapped async task execution to return (result, tokens_after) tuple - Updated _aprocess_async_tasks to unwrap and use captured tokens_after - Updated type hints for pending_tasks to reflect new signature Note: Threading-based async_execution still has similar race condition as it's harder to wrap threaded execution. Will track separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
♻️ Duplicate comments (2)
lib/crewai/src/crewai/crew.py (2)
206-209: The type annotation still usesAnydespite being marked as addressed.A previous review flagged this field should use
WorkflowTokenMetrics | Noneinstead ofAny | Nonefor type safety. The type is imported locally at line 1673, so it should be moved to the module-level imports and used here.Based on past review comments, this was previously flagged but appears unresolved.
1448-1448: Use direct attribute access instead ofgetattr.A previous review noted that since
workflow_token_metricsis a defined class field with a default value, direct access viaself.workflow_token_metricsis preferred overgetattr(self, 'workflow_token_metrics', None).Based on past review comments.
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
lib/crewai/src/crewai/crew.py
🧰 Additional context used
🧬 Code graph analysis (1)
lib/crewai/src/crewai/crew.py (2)
lib/crewai/src/crewai/tasks/task_output.py (1)
TaskOutput(13-106)lib/crewai/src/crewai/types/usage_metrics.py (3)
AgentTokenMetrics(49-67)WorkflowTokenMetrics(93-117)TaskTokenMetrics(70-90)
🔇 Additional comments (8)
lib/crewai/src/crewai/crew.py (8)
951-971: Acknowledged race condition in token tracking.The inline comment correctly notes that capturing tokens immediately after task completion "reduces (but doesn't eliminate) race conditions." In concurrent scenarios where agents share LLM instances or token counters, tokens from other tasks may still be attributed to this task. This is a known limitation of the current approach.
Consider documenting this limitation in the method docstring or module documentation, especially if multiple tasks might execute concurrently with shared resources.
979-993: Token tracking implementation is correct.The synchronous token tracking follows the same pattern as the async version: capture before, execute, capture after, attach metrics. The delegation to
_attach_task_token_metricsprovides good separation of concerns.
1028-1035: Async task processing correctly unwraps token metrics.The code properly unwraps the
(task_output, tokens_after)tuple returned from the wrapped async execution and combines it with the earlier-capturedtokens_beforeto compute the task-specific token delta.
1175-1186: LGTM!Token tracking for async task execution in the synchronous process flow correctly captures
tokens_beforeand stores it for later delta calculation.
1192-1206: LGTM!Synchronous task execution in the synchronous process correctly implements token tracking with the same pattern used throughout the codebase.
1457-1464: Token tracking for completed futures is correct.The code properly captures
tokens_afteronce the future completes and attaches the metrics by calculating the delta with the earlier-capturedtokens_before.
1696-1723: Limitation documented: Cannot distinguish multiple agents with same role.The TODO comment at line 1704 and the logic that breaks after the first matching agent (line 1723) acknowledge the known limitation that per-task metrics cannot distinguish between multiple agents with the same role. This was previously flagged in past reviews.
As per past review comments, this is a known limitation with a tracking issue.
2088-2098: LGTM!The
_get_agent_token_usagehelper correctly handles the different ways agents might track tokens (BaseLLM vs _token_process) with appropriate fallbacks.
| task: ConditionalTask, | ||
| task_outputs: list[TaskOutput], | ||
| pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int]], | ||
| pending_tasks: list[tuple[Task, asyncio.Task[tuple[TaskOutput, Any]], int, Any, Any]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Use more specific types in tuple type hints.
The tuple type hints at lines 1008 and 1023 use Any, Any for the last two elements (agent and tokens_before). These should be typed as BaseAgent | None and UsageMetrics respectively for better type safety:
🔎 Proposed fix
- pending_tasks: list[tuple[Task, asyncio.Task[tuple[TaskOutput, Any]], int, Any, Any]],
+ pending_tasks: list[tuple[Task, asyncio.Task[tuple[TaskOutput, UsageMetrics]], int, BaseAgent | None, UsageMetrics]],Also applies to: 1023-1023
🤖 Prompt for AI Agents
In lib/crewai/src/crewai/crew.py around lines 1008 and 1023, the tuple type
hints use Any, Any for the last two elements; change those to use the concrete
types BaseAgent | None (for the agent) and UsageMetrics (for tokens_before)
respectively, updating the tuple annotations accordingly and adding/importing
BaseAgent and UsageMetrics at the top of the file if they are not already
imported.
lib/crewai/src/crewai/crew.py
Outdated
| task: ConditionalTask, | ||
| task_outputs: list[TaskOutput], | ||
| futures: list[tuple[Task, Future[TaskOutput], int]], | ||
| futures: list[tuple[Task, Future[TaskOutput], int, Any, Any]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Use more specific types in tuple type hint.
Similar to the async version, this tuple type hint should use BaseAgent | None and UsageMetrics instead of Any, Any for the last two elements:
🔎 Proposed fix
- futures: list[tuple[Task, Future[TaskOutput], int, Any, Any]],
+ futures: list[tuple[Task, Future[TaskOutput], int, BaseAgent | None, UsageMetrics]],🤖 Prompt for AI Agents
In lib/crewai/src/crewai/crew.py around line 1221, the tuple type hint for
futures is too generic (uses Any, Any); change it to list[tuple[Task,
Future[TaskOutput], int, BaseAgent | None, UsageMetrics]] so the last two
elements are typed as BaseAgent | None and UsageMetrics, and add/import
BaseAgent and UsageMetrics at the top of the module if they are not already
imported.
| def _attach_task_token_metrics( | ||
| self, | ||
| task_output: TaskOutput, | ||
| task: Task, | ||
| agent: BaseAgent | None, | ||
| tokens_before: UsageMetrics, | ||
| tokens_after: UsageMetrics | ||
| ) -> TaskOutput: | ||
| """Attach per-task token metrics to the task output.""" | ||
| from crewai.types.usage_metrics import TaskTokenMetrics | ||
|
|
||
| if not agent: | ||
| return task_output | ||
|
|
||
| # Calculate the delta (tokens used by this specific task) | ||
| task_tokens = TaskTokenMetrics( | ||
| task_name=getattr(task, 'name', None) or task.description[:50], | ||
| task_id=str(getattr(task, 'id', '')), | ||
| agent_name=getattr(agent, 'role', 'Unknown Agent'), | ||
| total_tokens=tokens_after.total_tokens - tokens_before.total_tokens, | ||
| prompt_tokens=tokens_after.prompt_tokens - tokens_before.prompt_tokens, | ||
| cached_prompt_tokens=tokens_after.cached_prompt_tokens - tokens_before.cached_prompt_tokens, | ||
| completion_tokens=tokens_after.completion_tokens - tokens_before.completion_tokens, | ||
| successful_requests=tokens_after.successful_requests - tokens_before.successful_requests | ||
| ) | ||
|
|
||
| # Attach to task output | ||
| task_output.usage_metrics = task_tokens | ||
|
|
||
| # Store in workflow metrics | ||
| if not hasattr(self, 'workflow_token_metrics') or self.workflow_token_metrics is None: | ||
| from crewai.types.usage_metrics import WorkflowTokenMetrics | ||
| self.workflow_token_metrics = WorkflowTokenMetrics() | ||
|
|
||
| # Use task_id in the key to prevent collision when multiple tasks have the same name | ||
| task_key = f"{task_tokens.task_id}_{task_tokens.task_name}_{task_tokens.agent_name}" | ||
| self.workflow_token_metrics.per_task[task_key] = task_tokens | ||
|
|
||
| return task_output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Task key construction could fail if task_id is empty.
Line 2135 builds the task key as f"{task_tokens.task_id}_{task_tokens.task_name}_{task_tokens.agent_name}", but if task_id is an empty string (when getattr(task, 'id', '') returns empty), the key will start with an underscore and may still collide if task_name and agent_name are identical.
🔎 Proposed fix
- task_key = f"{task_tokens.task_id}_{task_tokens.task_name}_{task_tokens.agent_name}"
+ task_key = f"{task_tokens.task_id or id(task)}_{task_tokens.task_name}_{task_tokens.agent_name}"This ensures a unique identifier is always present by using the task's memory address as a fallback.
🤖 Prompt for AI Agents
In lib/crewai/src/crewai/crew.py around lines 2100 to 2138, the constructed
task_key can be empty or collide when task_id is an empty string; change the key
construction to use a guaranteed fallback (e.g., hex(id(task)) or str(id(task)))
when task_tokens.task_id is falsy so the key becomes
f"{task_id_or_fallback}_{task_tokens.task_name}_{task_tokens.agent_name}",
ensuring a unique identifier even if task.id is missing.
| tokens_after: UsageMetrics | ||
| ) -> TaskOutput: | ||
| """Attach per-task token metrics to the task output.""" | ||
| from crewai.types.usage_metrics import TaskTokenMetrics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider moving local import to module level.
The import of TaskTokenMetrics is done locally within the method. For consistency and to avoid repeated imports, consider moving this to the module-level imports along with other usage_metrics types.
🤖 Prompt for AI Agents
In lib/crewai/src/crewai/crew.py around line 2109, there is a local import "from
crewai.types.usage_metrics import TaskTokenMetrics" inside a method; move this
import to the module-level imports at the top of the file alongside the other
usage_metrics types, remove the local import from inside the method, and run
tests/lint to ensure no circular import issues—if a circular import appears,
resolve it by importing the type using a typing.TYPE_CHECKING guard or using a
string-forward reference instead.
Capture task, exec_data, and context via default arguments to avoid Python's late-binding closure behavior. Without this fix, when multiple async tasks are created back-to-back, they would all reference values from the last loop iteration, causing wrong tasks to be executed with wrong agents and incorrect token attribution.
…itation 1. Fixed manager agent using manager_role as key instead of manager_id. Now all agents (regular and manager) are keyed by agent_id in workflow_metrics.per_agent for consistency. 2. Added documentation for the threading-based async task race condition in _process_async_tasks. This is a known limitation tracked by issue crewAIInc#4168. Users should use akickoff() for accurate async task token tracking.
Instead of calling task.execute_async() and capturing tokens_after outside the thread, we now: 1. Create a wrapper function that executes task.execute_sync() in thread 2. Capture tokens_after immediately after completion WITHIN the thread 3. Return (result, tokens_after) tuple from the thread 4. Unwrap and use captured tokens_after in _process_async_tasks This is the same approach used for asyncio tasks and properly avoids race conditions when concurrent tasks from the same agent run in parallel. Also uses default arguments to avoid late-binding closure issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (7)
lib/crewai/src/crewai/crew.py (7)
206-209: Type annotation still usesAnyinstead of explicit type.A previous review requested using
WorkflowTokenMetrics | Noneinstead ofAny | Nonefor better type safety and IDE support. This has not yet been addressed.🔎 Proposed fix
Add the import at module level (move from line 1709):
+from crewai.types.usage_metrics import ( + AgentTokenMetrics, + TaskTokenMetrics, + UsageMetrics, + WorkflowTokenMetrics, +)Then update the field annotation:
- workflow_token_metrics: Any | None = Field( + workflow_token_metrics: WorkflowTokenMetrics | None = Field( default=None, description="Detailed per-agent and per-task token metrics.", )Based on past reviews and coding guidelines.
1013-1013: Type hints still useAnyfor agent and tokens.Previous reviews requested more specific types for the last two tuple elements:
BaseAgent | Nonefor the agent andUsageMetricsfor tokens_before/tokens_after.🔎 Proposed fix
- pending_tasks: list[tuple[Task, asyncio.Task[tuple[TaskOutput, Any]], int, Any, Any]], + pending_tasks: list[tuple[Task, asyncio.Task[tuple[TaskOutput, UsageMetrics]], int, BaseAgent | None, UsageMetrics]],Apply the same pattern at line 1028.
Based on past reviews.
Also applies to: 1028-1028
1251-1251: Type hints still useAnyfor agent and tokens.Similar to the async version, the tuple type hints should use
BaseAgent | NoneandUsageMetricsinstead ofAny, Anyfor better type safety.🔎 Proposed fix
- futures: list[tuple[Task, Future[tuple[TaskOutput, Any]], int, Any, Any]], + futures: list[tuple[Task, Future[tuple[TaskOutput, UsageMetrics]], int, BaseAgent | None, UsageMetrics]],Apply the same pattern at line 1483.
Based on past reviews.
Also applies to: 1483-1483
1478-1478: Consider direct attribute access instead ofgetattr.A previous review noted that since
workflow_token_metricsis a defined class field with a default value, you can access it directly asself.workflow_token_metricsinstead of usinggetattr.🔎 Proposed fix
- token_metrics=getattr(self, 'workflow_token_metrics', None), + token_metrics=self.workflow_token_metrics,Based on past reviews.
2146-2146: Local import could be moved to module level.The import of
TaskTokenMetricsis done locally. For consistency with other usage_metrics types, consider moving this to the module-level imports.🔎 Proposed change
Move the import to the top of the file alongside
UsageMetricsand other usage_metrics types, then remove the local import from this method.Based on past reviews.
2152-2173: Task key may be empty or collide when task_id is missing.At line 2154,
task_iddefaults to an empty string whengetattr(task, 'id', '')returns empty. This causes the task key at line 2172 to start with an underscore and could still collide if multiple tasks have identicaltask_nameandagent_name.🔎 Proposed fix
- task_tokens = TaskTokenMetrics( - task_name=getattr(task, 'name', None) or task.description[:50], - task_id=str(getattr(task, 'id', '')), + task_id = str(getattr(task, 'id', '')) or str(id(task)) + task_tokens = TaskTokenMetrics( + task_name=getattr(task, 'name', None) or task.description[:50], + task_id=task_id, agent_name=getattr(agent, 'role', 'Unknown Agent'),This ensures a unique identifier is always present by using the task's memory address as a fallback.
Based on past reviews.
1707-1710: Local imports could be moved to module level.While a previous review marked this as "Addressed," the imports for
AgentTokenMetricsandWorkflowTokenMetricsremain local within the method. Consider moving these to the module-level imports alongsideUsageMetrics(line 85) for consistency, unless there's a specific reason (e.g., circular import) to keep them local.🔎 Proposed change
At the top of the file, update the import from line 85:
-from crewai.types.usage_metrics import UsageMetrics +from crewai.types.usage_metrics import ( + AgentTokenMetrics, + TaskTokenMetrics, + UsageMetrics, + WorkflowTokenMetrics, +)Then remove the local imports at lines 1707-1710.
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
lib/crewai/src/crewai/crew.py
🔇 Additional comments (2)
lib/crewai/src/crewai/crew.py (2)
950-977: Well-implemented closure fix and race condition mitigation.The use of default arguments (
_task=task,_exec_data=exec_data,_context=context) correctly prevents late-binding closure issues that would cause all async tasks to reference the last loop iteration's values. Capturingtokens_afterimmediately after task completion within the async wrapper reduces (though doesn't eliminate) race conditions for concurrent token attribution.
2137-2165: Token delta calculation is correct.The helper method properly calculates per-task token usage by subtracting
tokens_beforefromtokens_afterfor each metric (total_tokens, prompt_tokens, completion_tokens, etc.). This provides accurate per-task attribution.
| if task.async_execution: | ||
| # Capture token usage before async task execution | ||
| tokens_before = self._get_agent_token_usage(exec_data.agent) | ||
|
|
||
| context = self._get_context( | ||
| task, [last_sync_output] if last_sync_output else [] | ||
| ) | ||
| future = task.execute_async( | ||
| agent=exec_data.agent, | ||
| context=context, | ||
| tools=exec_data.tools, | ||
| ) | ||
| futures.append((task, future, task_index)) | ||
|
|
||
| # Create a wrapper that captures tokens immediately after task completion | ||
| # to avoid race conditions with concurrent tasks from the same agent | ||
| # Use default arguments to capture values at definition time (avoid late-binding) | ||
| def _wrapped_sync_task_execution( | ||
| _task=task, | ||
| _exec_data=exec_data, | ||
| _context=context, | ||
| _self=self | ||
| ): | ||
| result = _task.execute_sync( | ||
| agent=_exec_data.agent, | ||
| context=_context, | ||
| tools=_exec_data.tools, | ||
| ) | ||
| # Capture tokens immediately after task completes within the thread | ||
| tokens_after = _self._get_agent_token_usage(_exec_data.agent) | ||
| return result, tokens_after | ||
|
|
||
| # Submit to thread pool and get future | ||
| future: Future[tuple[TaskOutput, Any]] = Future() | ||
| def _run_in_thread(): | ||
| try: | ||
| result = _wrapped_sync_task_execution() | ||
| future.set_result(result) | ||
| except Exception as e: | ||
| future.set_exception(e) | ||
|
|
||
| import threading | ||
| threading.Thread(daemon=True, target=_run_in_thread).start() | ||
| futures.append((task, future, task_index, exec_data.agent, tokens_before)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider using ThreadPoolExecutor instead of manually creating threads.
The closure fix using default arguments is correct, and capturing tokens within the thread reduces race conditions. However, manually creating threads (line 1214-1215) rather than using a thread pool could lead to resource exhaustion if many async tasks execute concurrently.
🔎 Proposed improvement using ThreadPoolExecutor
At the class level, maintain a thread pool:
from concurrent.futures import ThreadPoolExecutor
# In Crew class
_thread_pool: ThreadPoolExecutor = PrivateAttr(default_factory=lambda: ThreadPoolExecutor(max_workers=10))Then replace the manual thread creation:
- future: Future[tuple[TaskOutput, Any]] = Future()
- def _run_in_thread():
- try:
- result = _wrapped_sync_task_execution()
- future.set_result(result)
- except Exception as e:
- future.set_exception(e)
-
- import threading
- threading.Thread(daemon=True, target=_run_in_thread).start()
+ future = self._thread_pool.submit(_wrapped_sync_task_execution)
futures.append((task, future, task_index, exec_data.agent, tokens_before))This provides better resource management and prevents unbounded thread creation.
Committable suggestion skipped: line range outside the PR's diff.
| if workflow_metrics.per_task: | ||
| # Sum up tokens for each agent from their tasks | ||
| # We need to find which agent_id corresponds to each task's agent_name | ||
| for task_name, task_metrics in workflow_metrics.per_task.items(): | ||
| agent_name = task_metrics.agent_name | ||
| # Find the agent_id for this agent_name from agent_info_map | ||
| # For now, we'll use the agent_name as a temporary key but this needs improvement | ||
| # TODO: Store agent_id in TaskTokenMetrics to avoid this lookup | ||
| matching_agent_ids = [aid for aid, (name, _) in agent_info_map.items() if name == agent_name] | ||
|
|
||
| # Use the first matching agent_id (limitation: can't distinguish between same-role agents) | ||
| # This is better than nothing but ideally we'd store agent_id in TaskTokenMetrics | ||
| for agent_id in matching_agent_ids: | ||
| if agent_id not in agent_token_sums: | ||
| agent_token_sums[agent_id] = { | ||
| 'total_tokens': 0, | ||
| 'prompt_tokens': 0, | ||
| 'cached_prompt_tokens': 0, | ||
| 'completion_tokens': 0, | ||
| 'successful_requests': 0 | ||
| } | ||
| # Only add to the first matching agent (this is the limitation) | ||
| agent_token_sums[agent_id]['total_tokens'] += task_metrics.total_tokens | ||
| agent_token_sums[agent_id]['prompt_tokens'] += task_metrics.prompt_tokens | ||
| agent_token_sums[agent_id]['cached_prompt_tokens'] += task_metrics.cached_prompt_tokens | ||
| agent_token_sums[agent_id]['completion_tokens'] += task_metrics.completion_tokens | ||
| agent_token_sums[agent_id]['successful_requests'] += task_metrics.successful_requests | ||
| break # Only add to first matching agent | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, find the TaskTokenMetrics class definition
rg "class TaskTokenMetrics" -A 15 --type=pyRepository: Devasy/crewAI-telemetry
Length of output: 1520
🏁 Script executed:
#!/bin/bash
# Check the _attach_task_token_metrics method around line 2155
rg "_attach_task_token_metrics" -B 5 -A 30 --type=pyRepository: Devasy/crewAI-telemetry
Length of output: 12865
🏁 Script executed:
#!/bin/bash
# Get the file structure to understand the layout
wc -l lib/crewai/src/crewai/crew.pyRepository: Devasy/crewAI-telemetry
Length of output: 101
🏁 Script executed:
#!/bin/bash
# Get complete TaskTokenMetrics class definition
rg "class TaskTokenMetrics" -A 40 lib/crewai/src/crewai/types/usage_metrics.pyRepository: Devasy/crewAI-telemetry
Length of output: 2282
🏁 Script executed:
#!/bin/bash
# Check agent classes to see what attributes they have
rg "class.*Agent" -A 20 --type=py | head -100Repository: Devasy/crewAI-telemetry
Length of output: 7785
🏁 Script executed:
#!/bin/bash
# Check the exact lines 1732-1760 to see the aggregation code mentioned
sed -n '1732,1760p' lib/crewai/src/crewai/crew.pyRepository: Devasy/crewAI-telemetry
Length of output: 2066
🏁 Script executed:
#!/bin/bash
# Check the exact _attach_task_token_metrics implementation to see line numbers
rg "def _attach_task_token_metrics" -A 50 lib/crewai/src/crewai/crew.pyRepository: Devasy/crewAI-telemetry
Length of output: 1934
🏁 Script executed:
#!/bin/bash
# Find Agent class definition to see if it has an id attribute
fd "agent.*\.py" lib/crewai/src/crewai | head -10Repository: Devasy/crewAI-telemetry
Length of output: 658
🏁 Script executed:
#!/bin/bash
# Check the BaseAgent or Agent class for id attribute
rg "class BaseAgent|class Agent" -A 30 lib/crewai/src/crewai --type=py | grep -E "(class|id|uuid)" | head -20Repository: Devasy/crewAI-telemetry
Length of output: 1867
🏁 Script executed:
#!/bin/bash
# More direct search for Agent id
rg "self\.id\s*=" lib/crewai/src/crewai --type=py -B 3 | head -40Repository: Devasy/crewAI-telemetry
Length of output: 49
🏁 Script executed:
#!/bin/bash
# Check BaseAgent class for id attribute
rg "class BaseAgent" -A 50 lib/crewai/src/crewai/agents/agent_builder/base_agent.py | head -80Repository: Devasy/crewAI-telemetry
Length of output: 3058
🏁 Script executed:
#!/bin/bash
# Search for agent.id usage in the codebase
rg "agent\.id" lib/crewai/src/crewai --type=py | head -20Repository: Devasy/crewAI-telemetry
Length of output: 2191
🏁 Script executed:
#!/bin/bash
# Check if UUID or similar is used for agent ids
rg "uuid|UUID" lib/crewai/src/crewai/agents --type=py -B 2 -A 2 | head -50Repository: Devasy/crewAI-telemetry
Length of output: 2156
Add agent_id field to TaskTokenMetrics to eliminate agent name-based lookup.
The review correctly identifies the limitation: when multiple agents share the same role, token aggregation fails because it relies on agent_name matching and only processes the first match.
The suggested fix is feasible. BaseAgent has an id attribute (UUID4, auto-generated) that's accessible throughout the codebase. However, TaskTokenMetrics currently does not have an agent_id field—it only stores agent_name.
To implement the fix:
- Add
agent_id: str | Nonefield to TaskTokenMetrics inlib/crewai/src/crewai/types/usage_metrics.py - Capture it in
_attach_task_token_metrics(around line 2155):
agent_id=str(getattr(agent, 'id', '')),- Update lines 1732-1760 to aggregate by
task_metrics.agent_idinstead of performing agent_name lookup
🤖 Prompt for AI Agents
In lib/crewai/src/crewai/crew.py around lines 1732-1760, the code aggregates
task token metrics by matching agent_name to agent_info_map which breaks when
multiple agents share the same role; change the aggregation to use a concrete
agent_id stored on each TaskTokenMetrics instead. Add an agent_id: str | None
field to TaskTokenMetrics in lib/crewai/src/crewai/types/usage_metrics.py,
populate it where TaskTokenMetrics are created in _attach_task_token_metrics
(around line ~2155) by capturing getattr(agent, "id", "") or None, then update
the loop at 1732-1760 to use task_metrics.agent_id as the key for
agent_token_sums (with a safe fallback to the existing name-based lookup only if
agent_id is empty) and adjust typings/imports accordingly.
Summary by CodeRabbit
New Features
Documentation
Tests
✏️ Tip: You can customize this high-level summary in your review settings.