Skip to content

Commit c28dfbf

Browse files
committed
Working span linking
1 parent bd20f13 commit c28dfbf

File tree

2 files changed

+116
-11
lines changed

2 files changed

+116
-11
lines changed

ddtrace/contrib/internal/crewai/patch.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def traced_kickoff(crewai, pin, func, instance, args, kwargs):
4444
span.set_exc_info(*sys.exc_info())
4545
raise
4646
finally:
47-
kwargs["instance"] = instance
47+
kwargs["_dd.instance"] = instance
4848
integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=result, operation="crew")
4949
span.finish()
5050
return result
@@ -71,7 +71,7 @@ def traced_task_execute(crewai, pin, func, instance, args, kwargs):
7171
finally:
7272
if getattr(instance, "_ddtrace_ctx", None):
7373
delattr(instance, "_ddtrace_ctx")
74-
kwargs["instance"] = instance
74+
kwargs["_dd.instance"] = instance
7575
integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=result, operation="task")
7676
span.finish()
7777
return result
@@ -107,7 +107,7 @@ def traced_agent_execute(crewai, pin, func, instance, args, kwargs):
107107
span.set_exc_info(*sys.exc_info())
108108
raise
109109
finally:
110-
kwargs["instance"] = instance
110+
kwargs["_dd.instance"] = instance
111111
integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=result, operation="agent")
112112
span.finish()
113113
return result
@@ -126,7 +126,7 @@ def traced_tool_run(crewai, pin, func, instance, args, kwargs):
126126
span.set_exc_info(*sys.exc_info())
127127
raise
128128
finally:
129-
kwargs["instance"] = instance
129+
kwargs["_dd.instance"] = instance
130130
integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=result, operation="tool")
131131
span.finish()
132132
return result
@@ -136,20 +136,36 @@ def traced_tool_run(crewai, pin, func, instance, args, kwargs):
136136
async def traced_flow_kickoff(crewai, pin, func, instance, args, kwargs):
137137
integration = crewai._datadog_integration
138138
span_name = getattr(type(instance), "__name__", "CrewAI Flow")
139-
with integration.trace(pin, "CrewAI Flow", span_name=span_name, operation="flow", submit_to_llmobs=False):
139+
with integration.trace(pin, "CrewAI Flow", span_name=span_name, operation="flow", submit_to_llmobs=True) as span:
140140
result = await func(*args, **kwargs)
141+
integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=result, operation="flow")
141142
return result
142143

143144

144145
@with_traced_module
145146
async def traced_flow_method(crewai, pin, func, instance, args, kwargs):
146147
integration = crewai._datadog_integration
147148
span_name = get_argument_value(args, kwargs, 0, "method_name", optional=True) or "Flow Method"
148-
with integration.trace(pin, "CrewAI Flow", span_name=span_name, operation="flow_method", submit_to_llmobs=False):
149+
with integration.trace(
150+
pin, "CrewAI Flow", span_name=span_name, operation="flow_method", submit_to_llmobs=True
151+
) as span:
149152
result = await func(*args, **kwargs)
153+
kwargs["_dd.instance"] = instance
154+
integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=result, operation="flow_method")
150155
return result
151156

152157

158+
@with_traced_module
159+
def patched_find_triggered_methods(crewai, pin, func, instance, args, kwargs):
160+
integration = crewai._datadog_integration
161+
result = func(*args, **kwargs)
162+
if get_argument_value(args, kwargs, 1, "router_only", optional=True) is False:
163+
# TODO: Work for routers too?
164+
current_span = pin.tracer.current_span()
165+
integration._llmobs_set_span_link_on_flow(current_span, args, kwargs, instance)
166+
return result
167+
168+
153169
def patch():
154170
if getattr(crewai, "_datadog_patch", False):
155171
return
@@ -168,6 +184,7 @@ def patch():
168184
wrap(crewai.tools.structured_tool, "CrewStructuredTool.invoke", traced_tool_run(crewai))
169185
wrap(crewai, "Flow.kickoff_async", traced_flow_kickoff(crewai))
170186
wrap(crewai, "Flow._execute_method", traced_flow_method(crewai))
187+
wrap(crewai, "Flow._find_triggered_methods", patched_find_triggered_methods(crewai))
171188

172189

173190
def unpatch():

ddtrace/llmobs/_integrations/crewai.py

Lines changed: 93 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import Dict
44
from typing import List
55
from typing import Optional
6+
from weakref import WeakKeyDictionary
67

78
from ddtrace.internal import core
89
from ddtrace.internal.logger import get_logger
@@ -26,13 +27,24 @@
2627
log = get_logger(__name__)
2728

2829

30+
OP_NAMES_TO_SPAN_KIND = {
31+
"crew": "workflow",
32+
"task": "task",
33+
"agent": "agent",
34+
"tool": "tool",
35+
"flow": "workflow",
36+
"flow_method": "task",
37+
}
38+
39+
2940
class CrewAIIntegration(BaseLLMIntegration):
3041
_integration_name = "crewai"
3142
# the CrewAI integration's task span linking relies on keeping track of an internal Datadog crew ID,
3243
# which follows the format "crew_{trace_id}_{root_span_id}".
3344
_crews_to_task_span_ids: Dict[str, List[str]] = {} # maps crew ID to list of task span_ids
3445
_crews_to_tasks: Dict[str, Dict[str, Any]] = {} # maps crew ID to dictionary of task_id to span_id and span_links
3546
_planning_crew_ids: List[str] = [] # list of crew IDs that correspond to planning crew instances
47+
_flow_span_to_method_to_span_dict: WeakKeyDictionary[Span, Dict[str, Dict[str, Any]]] = WeakKeyDictionary()
3648

3749
def trace(self, pin: Pin, operation_id: str, submit_to_llmobs: bool = False, **kwargs: Dict[str, Any]) -> Span:
3850
if kwargs.get("_ddtrace_ctx"):
@@ -56,6 +68,15 @@ def trace(self, pin: Pin, operation_id: str, submit_to_llmobs: bool = False, **k
5668
self._crews_to_task_span_ids.get(crew_id, []).append(str(span.span_id))
5769
task_node = self._crews_to_tasks.get(crew_id, {}).setdefault(str(task_id), {})
5870
task_node["span_id"] = str(span.span_id)
71+
if kwargs.get("operation") == "flow":
72+
self._flow_span_to_method_to_span_dict[span] = {}
73+
if kwargs.get("operation") == "flow_method":
74+
span_name = kwargs.get("span_name", "")
75+
method_name: str = span_name if isinstance(span_name, str) else ""
76+
if span._parent is None:
77+
return span
78+
span_dict = self._flow_span_to_method_to_span_dict.get(span._parent, {}).setdefault(method_name, {})
79+
span_dict["span_id"] = str(span.span_id)
5980
return span
6081

6182
def _get_current_ctx(self, pin):
@@ -74,7 +95,7 @@ def _llmobs_set_tags(
7495
response: Optional[Any] = None,
7596
operation: str = "",
7697
) -> None:
77-
span._set_ctx_item(SPAN_KIND, "workflow" if operation == "crew" else operation)
98+
span._set_ctx_item(SPAN_KIND, OP_NAMES_TO_SPAN_KIND.get(operation, "task"))
7899
if operation == "crew":
79100
crew_id = _get_crew_id(span, "crew")
80101
self._llmobs_set_tags_crew(span, args, kwargs, response)
@@ -88,9 +109,13 @@ def _llmobs_set_tags(
88109
self._llmobs_set_tags_agent(span, args, kwargs, response)
89110
elif operation == "tool":
90111
self._llmobs_set_tags_tool(span, args, kwargs, response)
112+
elif operation == "flow":
113+
self._llmobs_set_tags_flow(span, args, kwargs, response)
114+
elif operation == "flow_method":
115+
self._llmobs_set_tags_flow_method(span, args, kwargs, response)
91116

92117
def _llmobs_set_tags_crew(self, span, args, kwargs, response):
93-
crew_instance = kwargs.get("instance")
118+
crew_instance = kwargs.pop("_dd.instance", None)
94119
crew_id = _get_crew_id(span, "crew")
95120
task_span_ids = self._crews_to_task_span_ids.get(crew_id, [])
96121
if task_span_ids:
@@ -117,7 +142,7 @@ def _llmobs_set_tags_crew(self, span, args, kwargs, response):
117142

118143
def _llmobs_set_tags_task(self, span, args, kwargs, response):
119144
crew_id = _get_crew_id(span, "task")
120-
task_instance = kwargs.get("instance")
145+
task_instance = kwargs.pop("_dd.instance", None)
121146
task_id = getattr(task_instance, "id", None)
122147
task_name = getattr(task_instance, "name", "")
123148
task_description = getattr(task_instance, "description", "")
@@ -151,7 +176,7 @@ def _llmobs_set_tags_agent(self, span, args, kwargs, response):
151176
"""Set span links and metadata for agent spans.
152177
Agent spans are 1:1 with its parent (task/tool) span, so we link them directly here, even on the parent itself.
153178
"""
154-
agent_instance = kwargs.get("instance")
179+
agent_instance = kwargs.get("_dd.instance", None)
155180
self._tag_agent_manifest(span, agent_instance)
156181
agent_role = getattr(agent_instance, "role", "")
157182
task_description = getattr(kwargs.get("task"), "description", "")
@@ -183,7 +208,7 @@ def _llmobs_set_tags_agent(self, span, args, kwargs, response):
183208
span._set_ctx_item(OUTPUT_VALUE, response)
184209

185210
def _llmobs_set_tags_tool(self, span, args, kwargs, response):
186-
tool_instance = kwargs.get("instance")
211+
tool_instance = kwargs.pop("_dd.instance", None)
187212
tool_name = getattr(tool_instance, "name", "")
188213
description = _extract_tool_description_field(getattr(tool_instance, "description", ""))
189214
span._set_ctx_items(
@@ -247,6 +272,69 @@ def _get_agent_tools(self, tools):
247272
formatted_tools.append(tool_dict)
248273
return formatted_tools
249274

275+
def _llmobs_set_tags_flow(self, span, args, kwargs, response):
276+
span._set_ctx_items({NAME: span.name or "CrewAI Flow", OUTPUT_VALUE: str(response)})
277+
return
278+
279+
def _llmobs_set_tags_flow_method(self, span, args, kwargs, response):
280+
flow_instance = kwargs.pop("_dd.instance", None)
281+
input_dict = {"args": [str(arg) for arg in args[2:]], "kwargs": {k: str(v) for k, v in kwargs.items()}}
282+
span_links = (
283+
self._flow_span_to_method_to_span_dict.get(span._parent, {}).get(span.name, {}).get("span_links", [])
284+
)
285+
if span.name in getattr(flow_instance, "_start_methods", []):
286+
span_links.append(
287+
{
288+
"span_id": str(span._parent.span_id),
289+
"trace_id": format_trace_id(span.trace_id),
290+
"attributes": {"from": "input", "to": "input"},
291+
}
292+
)
293+
span._set_ctx_items(
294+
{
295+
NAME: span.name or "Flow Method",
296+
INPUT_VALUE: input_dict,
297+
OUTPUT_VALUE: str(response),
298+
SPAN_LINKS: span_links,
299+
}
300+
)
301+
return
302+
303+
def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance):
304+
trigger_method = get_argument_value(args, kwargs, 0, "trigger_method", optional=True)
305+
if not self.llmobs_enabled or not trigger_method:
306+
return
307+
trigger_span_dict = self._flow_span_to_method_to_span_dict.get(flow_span, {}).get(trigger_method)
308+
if not trigger_span_dict:
309+
return
310+
listeners = getattr(flow_instance, "_listeners", [])
311+
# Check trigger method against each listener methods' triggers
312+
for listener_name, (_, listener_triggers) in listeners.items():
313+
if trigger_method not in listener_triggers:
314+
continue
315+
span_dict = self._flow_span_to_method_to_span_dict.get(flow_span, {}).setdefault(listener_name, {})
316+
span_dict["trace_id"] = format_trace_id(flow_span.trace_id)
317+
span_links = span_dict.setdefault("span_links", [])
318+
span_links.append(
319+
{
320+
"span_id": str(trigger_span_dict["span_id"]),
321+
"trace_id": format_trace_id(flow_span.trace_id),
322+
"attributes": {"from": "output", "to": "input"},
323+
}
324+
)
325+
# If no listeners are triggered/AND_triggered, then link trigger span to its parent.
326+
if not any(trigger_method in listener_triggers for _, (_, listener_triggers) in listeners.items()):
327+
flow_span_span_links = flow_span._get_ctx_item(SPAN_LINKS) or []
328+
flow_span_span_links.append(
329+
{
330+
"span_id": str(trigger_span_dict["span_id"]),
331+
"trace_id": format_trace_id(flow_span.trace_id),
332+
"attributes": {"from": "output", "to": "output"},
333+
}
334+
)
335+
flow_span._set_ctx_item(SPAN_LINKS, flow_span_span_links)
336+
return
337+
250338
def _llmobs_set_span_link_on_task(self, span, args, kwargs):
251339
"""Set span links for the next queued task in a CrewAI workflow.
252340
This happens between task executions, (the current span is the crew span and the task span hasn't started yet)

0 commit comments

Comments
 (0)