77from ddtrace .contrib .internal .trace_utils import unwrap
88from ddtrace .contrib .internal .trace_utils import with_traced_module
99from ddtrace .contrib .internal .trace_utils import wrap
10+ from ddtrace .internal .utils import get_argument_value
1011from ddtrace .llmobs ._integrations import CrewAIIntegration
1112from ddtrace .trace import Pin
1213
@@ -131,6 +132,24 @@ def traced_tool_run(crewai, pin, func, instance, args, kwargs):
131132 return result
132133
133134
135+ @with_traced_module
136+ async def traced_flow_kickoff (crewai , pin , func , instance , args , kwargs ):
137+ integration = crewai ._datadog_integration
138+ span_name = getattr (type (instance ), "__name__" , "CrewAI Flow" )
139+ with integration .trace (pin , "CrewAI Flow" , span_name = span_name , operation = "flow" , submit_to_llmobs = False ):
140+ result = await func (* args , ** kwargs )
141+ return result
142+
143+
144+ @with_traced_module
145+ async def traced_flow_method (crewai , pin , func , instance , args , kwargs ):
146+ integration = crewai ._datadog_integration
147+ span_name = get_argument_value (args , kwargs , 0 , "method_name" , optional = True ) or "Flow Method"
148+ with integration .trace (pin , "CrewAI Flow" , span_name = span_name , operation = "flow_method" , submit_to_llmobs = False ):
149+ result = await func (* args , ** kwargs )
150+ return result
151+
152+
134153def patch ():
135154 if getattr (crewai , "_datadog_patch" , False ):
136155 return
@@ -147,6 +166,8 @@ def patch():
147166 wrap (crewai , "Task.execute_async" , traced_task_execute_async (crewai ))
148167 wrap (crewai , "Agent.execute_task" , traced_agent_execute (crewai ))
149168 wrap (crewai .tools .structured_tool , "CrewStructuredTool.invoke" , traced_tool_run (crewai ))
169+ wrap (crewai , "Flow.kickoff_async" , traced_flow_kickoff (crewai ))
170+ wrap (crewai , "Flow._execute_method" , traced_flow_method (crewai ))
150171
151172
152173def unpatch ():
0 commit comments