From bd20f1386289dce902662375eaf2fba1fbebc9e8 Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Mon, 21 Jul 2025 15:52:44 -0400 Subject: [PATCH 01/15] WIP crewAI APM tracing --- ddtrace/contrib/internal/crewai/patch.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/ddtrace/contrib/internal/crewai/patch.py b/ddtrace/contrib/internal/crewai/patch.py index bdb458408b7..136a7123394 100644 --- a/ddtrace/contrib/internal/crewai/patch.py +++ b/ddtrace/contrib/internal/crewai/patch.py @@ -7,6 +7,7 @@ from ddtrace.contrib.internal.trace_utils import unwrap from ddtrace.contrib.internal.trace_utils import with_traced_module from ddtrace.contrib.internal.trace_utils import wrap +from ddtrace.internal.utils import get_argument_value from ddtrace.llmobs._integrations import CrewAIIntegration from ddtrace.trace import Pin @@ -131,6 +132,24 @@ def traced_tool_run(crewai, pin, func, instance, args, kwargs): return result +@with_traced_module +async def traced_flow_kickoff(crewai, pin, func, instance, args, kwargs): + integration = crewai._datadog_integration + span_name = getattr(type(instance), "__name__", "CrewAI Flow") + with integration.trace(pin, "CrewAI Flow", span_name=span_name, operation="flow", submit_to_llmobs=False): + result = await func(*args, **kwargs) + return result + + +@with_traced_module +async def traced_flow_method(crewai, pin, func, instance, args, kwargs): + integration = crewai._datadog_integration + span_name = get_argument_value(args, kwargs, 0, "method_name", optional=True) or "Flow Method" + with integration.trace(pin, "CrewAI Flow", span_name=span_name, operation="flow_method", submit_to_llmobs=False): + result = await func(*args, **kwargs) + return result + + def patch(): if getattr(crewai, "_datadog_patch", False): return @@ -147,6 +166,8 @@ def patch(): wrap(crewai, "Task.execute_async", traced_task_execute_async(crewai)) wrap(crewai, "Agent.execute_task", traced_agent_execute(crewai)) wrap(crewai.tools.structured_tool, "CrewStructuredTool.invoke", traced_tool_run(crewai)) + wrap(crewai, "Flow.kickoff_async", traced_flow_kickoff(crewai)) + wrap(crewai, "Flow._execute_method", traced_flow_method(crewai)) def unpatch(): From c28dfbf729b2efefedcd1cd73402d16c30e69742 Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Mon, 21 Jul 2025 19:12:28 -0400 Subject: [PATCH 02/15] Working span linking --- ddtrace/contrib/internal/crewai/patch.py | 29 +++++-- ddtrace/llmobs/_integrations/crewai.py | 98 ++++++++++++++++++++++-- 2 files changed, 116 insertions(+), 11 deletions(-) diff --git a/ddtrace/contrib/internal/crewai/patch.py b/ddtrace/contrib/internal/crewai/patch.py index 136a7123394..48bdfd4b6d4 100644 --- a/ddtrace/contrib/internal/crewai/patch.py +++ b/ddtrace/contrib/internal/crewai/patch.py @@ -44,7 +44,7 @@ def traced_kickoff(crewai, pin, func, instance, args, kwargs): span.set_exc_info(*sys.exc_info()) raise finally: - kwargs["instance"] = instance + kwargs["_dd.instance"] = instance integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=result, operation="crew") span.finish() return result @@ -71,7 +71,7 @@ def traced_task_execute(crewai, pin, func, instance, args, kwargs): finally: if getattr(instance, "_ddtrace_ctx", None): delattr(instance, "_ddtrace_ctx") - kwargs["instance"] = instance + kwargs["_dd.instance"] = instance integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=result, operation="task") span.finish() return result @@ -107,7 +107,7 @@ def traced_agent_execute(crewai, pin, func, instance, args, kwargs): span.set_exc_info(*sys.exc_info()) raise finally: - kwargs["instance"] = instance + kwargs["_dd.instance"] = instance integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=result, operation="agent") span.finish() return result @@ -126,7 +126,7 @@ def traced_tool_run(crewai, pin, func, instance, args, kwargs): span.set_exc_info(*sys.exc_info()) raise finally: - kwargs["instance"] = instance + kwargs["_dd.instance"] = instance integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=result, operation="tool") span.finish() return result @@ -136,8 +136,9 @@ def traced_tool_run(crewai, pin, func, instance, args, kwargs): async def traced_flow_kickoff(crewai, pin, func, instance, args, kwargs): integration = crewai._datadog_integration span_name = getattr(type(instance), "__name__", "CrewAI Flow") - with integration.trace(pin, "CrewAI Flow", span_name=span_name, operation="flow", submit_to_llmobs=False): + with integration.trace(pin, "CrewAI Flow", span_name=span_name, operation="flow", submit_to_llmobs=True) as span: result = await func(*args, **kwargs) + integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=result, operation="flow") return result @@ -145,11 +146,26 @@ async def traced_flow_kickoff(crewai, pin, func, instance, args, kwargs): async def traced_flow_method(crewai, pin, func, instance, args, kwargs): integration = crewai._datadog_integration span_name = get_argument_value(args, kwargs, 0, "method_name", optional=True) or "Flow Method" - with integration.trace(pin, "CrewAI Flow", span_name=span_name, operation="flow_method", submit_to_llmobs=False): + with integration.trace( + pin, "CrewAI Flow", span_name=span_name, operation="flow_method", submit_to_llmobs=True + ) as span: result = await func(*args, **kwargs) + kwargs["_dd.instance"] = instance + integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=result, operation="flow_method") return result +@with_traced_module +def patched_find_triggered_methods(crewai, pin, func, instance, args, kwargs): + integration = crewai._datadog_integration + result = func(*args, **kwargs) + if get_argument_value(args, kwargs, 1, "router_only", optional=True) is False: + # TODO: Work for routers too? + current_span = pin.tracer.current_span() + integration._llmobs_set_span_link_on_flow(current_span, args, kwargs, instance) + return result + + def patch(): if getattr(crewai, "_datadog_patch", False): return @@ -168,6 +184,7 @@ def patch(): wrap(crewai.tools.structured_tool, "CrewStructuredTool.invoke", traced_tool_run(crewai)) wrap(crewai, "Flow.kickoff_async", traced_flow_kickoff(crewai)) wrap(crewai, "Flow._execute_method", traced_flow_method(crewai)) + wrap(crewai, "Flow._find_triggered_methods", patched_find_triggered_methods(crewai)) def unpatch(): diff --git a/ddtrace/llmobs/_integrations/crewai.py b/ddtrace/llmobs/_integrations/crewai.py index b73366d4e71..0afbcbf4f59 100644 --- a/ddtrace/llmobs/_integrations/crewai.py +++ b/ddtrace/llmobs/_integrations/crewai.py @@ -3,6 +3,7 @@ from typing import Dict from typing import List from typing import Optional +from weakref import WeakKeyDictionary from ddtrace.internal import core from ddtrace.internal.logger import get_logger @@ -26,6 +27,16 @@ log = get_logger(__name__) +OP_NAMES_TO_SPAN_KIND = { + "crew": "workflow", + "task": "task", + "agent": "agent", + "tool": "tool", + "flow": "workflow", + "flow_method": "task", +} + + class CrewAIIntegration(BaseLLMIntegration): _integration_name = "crewai" # the CrewAI integration's task span linking relies on keeping track of an internal Datadog crew ID, @@ -33,6 +44,7 @@ class CrewAIIntegration(BaseLLMIntegration): _crews_to_task_span_ids: Dict[str, List[str]] = {} # maps crew ID to list of task span_ids _crews_to_tasks: Dict[str, Dict[str, Any]] = {} # maps crew ID to dictionary of task_id to span_id and span_links _planning_crew_ids: List[str] = [] # list of crew IDs that correspond to planning crew instances + _flow_span_to_method_to_span_dict: WeakKeyDictionary[Span, Dict[str, Dict[str, Any]]] = WeakKeyDictionary() def trace(self, pin: Pin, operation_id: str, submit_to_llmobs: bool = False, **kwargs: Dict[str, Any]) -> Span: if kwargs.get("_ddtrace_ctx"): @@ -56,6 +68,15 @@ def trace(self, pin: Pin, operation_id: str, submit_to_llmobs: bool = False, **k self._crews_to_task_span_ids.get(crew_id, []).append(str(span.span_id)) task_node = self._crews_to_tasks.get(crew_id, {}).setdefault(str(task_id), {}) task_node["span_id"] = str(span.span_id) + if kwargs.get("operation") == "flow": + self._flow_span_to_method_to_span_dict[span] = {} + if kwargs.get("operation") == "flow_method": + span_name = kwargs.get("span_name", "") + method_name: str = span_name if isinstance(span_name, str) else "" + if span._parent is None: + return span + span_dict = self._flow_span_to_method_to_span_dict.get(span._parent, {}).setdefault(method_name, {}) + span_dict["span_id"] = str(span.span_id) return span def _get_current_ctx(self, pin): @@ -74,7 +95,7 @@ def _llmobs_set_tags( response: Optional[Any] = None, operation: str = "", ) -> None: - span._set_ctx_item(SPAN_KIND, "workflow" if operation == "crew" else operation) + span._set_ctx_item(SPAN_KIND, OP_NAMES_TO_SPAN_KIND.get(operation, "task")) if operation == "crew": crew_id = _get_crew_id(span, "crew") self._llmobs_set_tags_crew(span, args, kwargs, response) @@ -88,9 +109,13 @@ def _llmobs_set_tags( self._llmobs_set_tags_agent(span, args, kwargs, response) elif operation == "tool": self._llmobs_set_tags_tool(span, args, kwargs, response) + elif operation == "flow": + self._llmobs_set_tags_flow(span, args, kwargs, response) + elif operation == "flow_method": + self._llmobs_set_tags_flow_method(span, args, kwargs, response) def _llmobs_set_tags_crew(self, span, args, kwargs, response): - crew_instance = kwargs.get("instance") + crew_instance = kwargs.pop("_dd.instance", None) crew_id = _get_crew_id(span, "crew") task_span_ids = self._crews_to_task_span_ids.get(crew_id, []) if task_span_ids: @@ -117,7 +142,7 @@ def _llmobs_set_tags_crew(self, span, args, kwargs, response): def _llmobs_set_tags_task(self, span, args, kwargs, response): crew_id = _get_crew_id(span, "task") - task_instance = kwargs.get("instance") + task_instance = kwargs.pop("_dd.instance", None) task_id = getattr(task_instance, "id", None) task_name = getattr(task_instance, "name", "") task_description = getattr(task_instance, "description", "") @@ -151,7 +176,7 @@ def _llmobs_set_tags_agent(self, span, args, kwargs, response): """Set span links and metadata for agent spans. Agent spans are 1:1 with its parent (task/tool) span, so we link them directly here, even on the parent itself. """ - agent_instance = kwargs.get("instance") + agent_instance = kwargs.get("_dd.instance", None) self._tag_agent_manifest(span, agent_instance) agent_role = getattr(agent_instance, "role", "") task_description = getattr(kwargs.get("task"), "description", "") @@ -183,7 +208,7 @@ def _llmobs_set_tags_agent(self, span, args, kwargs, response): span._set_ctx_item(OUTPUT_VALUE, response) def _llmobs_set_tags_tool(self, span, args, kwargs, response): - tool_instance = kwargs.get("instance") + tool_instance = kwargs.pop("_dd.instance", None) tool_name = getattr(tool_instance, "name", "") description = _extract_tool_description_field(getattr(tool_instance, "description", "")) span._set_ctx_items( @@ -247,6 +272,69 @@ def _get_agent_tools(self, tools): formatted_tools.append(tool_dict) return formatted_tools + def _llmobs_set_tags_flow(self, span, args, kwargs, response): + span._set_ctx_items({NAME: span.name or "CrewAI Flow", OUTPUT_VALUE: str(response)}) + return + + def _llmobs_set_tags_flow_method(self, span, args, kwargs, response): + flow_instance = kwargs.pop("_dd.instance", None) + input_dict = {"args": [str(arg) for arg in args[2:]], "kwargs": {k: str(v) for k, v in kwargs.items()}} + span_links = ( + self._flow_span_to_method_to_span_dict.get(span._parent, {}).get(span.name, {}).get("span_links", []) + ) + if span.name in getattr(flow_instance, "_start_methods", []): + span_links.append( + { + "span_id": str(span._parent.span_id), + "trace_id": format_trace_id(span.trace_id), + "attributes": {"from": "input", "to": "input"}, + } + ) + span._set_ctx_items( + { + NAME: span.name or "Flow Method", + INPUT_VALUE: input_dict, + OUTPUT_VALUE: str(response), + SPAN_LINKS: span_links, + } + ) + return + + def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance): + trigger_method = get_argument_value(args, kwargs, 0, "trigger_method", optional=True) + if not self.llmobs_enabled or not trigger_method: + return + trigger_span_dict = self._flow_span_to_method_to_span_dict.get(flow_span, {}).get(trigger_method) + if not trigger_span_dict: + return + listeners = getattr(flow_instance, "_listeners", []) + # Check trigger method against each listener methods' triggers + for listener_name, (_, listener_triggers) in listeners.items(): + if trigger_method not in listener_triggers: + continue + span_dict = self._flow_span_to_method_to_span_dict.get(flow_span, {}).setdefault(listener_name, {}) + span_dict["trace_id"] = format_trace_id(flow_span.trace_id) + span_links = span_dict.setdefault("span_links", []) + span_links.append( + { + "span_id": str(trigger_span_dict["span_id"]), + "trace_id": format_trace_id(flow_span.trace_id), + "attributes": {"from": "output", "to": "input"}, + } + ) + # If no listeners are triggered/AND_triggered, then link trigger span to its parent. + if not any(trigger_method in listener_triggers for _, (_, listener_triggers) in listeners.items()): + flow_span_span_links = flow_span._get_ctx_item(SPAN_LINKS) or [] + flow_span_span_links.append( + { + "span_id": str(trigger_span_dict["span_id"]), + "trace_id": format_trace_id(flow_span.trace_id), + "attributes": {"from": "output", "to": "output"}, + } + ) + flow_span._set_ctx_item(SPAN_LINKS, flow_span_span_links) + return + def _llmobs_set_span_link_on_task(self, span, args, kwargs): """Set span links for the next queued task in a CrewAI workflow. This happens between task executions, (the current span is the crew span and the task span hasn't started yet) From 2076a0169825c144b2b820b62c5f069b4ab9d2e3 Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Wed, 30 Jul 2025 10:20:07 -0400 Subject: [PATCH 03/15] Simplify span linking --- ddtrace/llmobs/_integrations/crewai.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/ddtrace/llmobs/_integrations/crewai.py b/ddtrace/llmobs/_integrations/crewai.py index 0afbcbf4f59..1a1b5ee9155 100644 --- a/ddtrace/llmobs/_integrations/crewai.py +++ b/ddtrace/llmobs/_integrations/crewai.py @@ -273,7 +273,8 @@ def _get_agent_tools(self, tools): return formatted_tools def _llmobs_set_tags_flow(self, span, args, kwargs, response): - span._set_ctx_items({NAME: span.name or "CrewAI Flow", OUTPUT_VALUE: str(response)}) + inputs = get_argument_value(args, kwargs, 0, "inputs", optional=True) or {} + span._set_ctx_items({NAME: span.name or "CrewAI Flow", INPUT_VALUE: inputs, OUTPUT_VALUE: str(response)}) return def _llmobs_set_tags_flow_method(self, span, args, kwargs, response): @@ -301,17 +302,20 @@ def _llmobs_set_tags_flow_method(self, span, args, kwargs, response): return def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance): + """Set span links for the next queued listener method(s) in a CrewAI flow.""" trigger_method = get_argument_value(args, kwargs, 0, "trigger_method", optional=True) if not self.llmobs_enabled or not trigger_method: return trigger_span_dict = self._flow_span_to_method_to_span_dict.get(flow_span, {}).get(trigger_method) if not trigger_span_dict: return - listeners = getattr(flow_instance, "_listeners", []) + listeners = getattr(flow_instance, "_listeners", {}) + triggered = False # Check trigger method against each listener methods' triggers for listener_name, (_, listener_triggers) in listeners.items(): if trigger_method not in listener_triggers: continue + triggered = True span_dict = self._flow_span_to_method_to_span_dict.get(flow_span, {}).setdefault(listener_name, {}) span_dict["trace_id"] = format_trace_id(flow_span.trace_id) span_links = span_dict.setdefault("span_links", []) @@ -322,8 +326,7 @@ def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance): "attributes": {"from": "output", "to": "input"}, } ) - # If no listeners are triggered/AND_triggered, then link trigger span to its parent. - if not any(trigger_method in listener_triggers for _, (_, listener_triggers) in listeners.items()): + if triggered is False: flow_span_span_links = flow_span._get_ctx_item(SPAN_LINKS) or [] flow_span_span_links.append( { From 2d570086ef3b840b025aa5e91355858a22367f0a Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Wed, 30 Jul 2025 15:45:14 -0400 Subject: [PATCH 04/15] Add tests --- ddtrace/contrib/internal/crewai/patch.py | 5 +- ddtrace/contrib/internal/selenium/patch.py | 2 +- tests/contrib/crewai/conftest.py | 111 ++++++++++++++++++ tests/contrib/crewai/test_crewai.py | 20 ++++ tests/contrib/crewai/test_crewai_llmobs.py | 109 ++++++++++++----- tests/contrib/crewai/utils.py | 100 ++++++++++++++++ tests/contrib/google_generativeai/conftest.py | 2 +- .../contrib/langchain/test_langchain_patch.py | 2 +- ....crewai.test_crewai.test_complex_flow.json | 101 ++++++++++++++++ ...b.crewai.test_crewai.test_simple_flow.json | 56 +++++++++ 10 files changed, 476 insertions(+), 32 deletions(-) create mode 100644 tests/contrib/crewai/utils.py create mode 100644 tests/snapshots/tests.contrib.crewai.test_crewai.test_complex_flow.json create mode 100644 tests/snapshots/tests.contrib.crewai.test_crewai.test_simple_flow.json diff --git a/ddtrace/contrib/internal/crewai/patch.py b/ddtrace/contrib/internal/crewai/patch.py index 48bdfd4b6d4..9a645429166 100644 --- a/ddtrace/contrib/internal/crewai/patch.py +++ b/ddtrace/contrib/internal/crewai/patch.py @@ -147,7 +147,7 @@ async def traced_flow_method(crewai, pin, func, instance, args, kwargs): integration = crewai._datadog_integration span_name = get_argument_value(args, kwargs, 0, "method_name", optional=True) or "Flow Method" with integration.trace( - pin, "CrewAI Flow", span_name=span_name, operation="flow_method", submit_to_llmobs=True + pin, "CrewAI Flow Method", span_name=span_name, operation="flow_method", submit_to_llmobs=True ) as span: result = await func(*args, **kwargs) kwargs["_dd.instance"] = instance @@ -199,5 +199,8 @@ def unpatch(): unwrap(crewai.Task, "execute_async") unwrap(crewai.Agent, "execute_task") unwrap(crewai.tools.structured_tool.CrewStructuredTool, "invoke") + unwrap(crewai.Flow, "kickoff_async") + unwrap(crewai.Flow, "_execute_method") + unwrap(crewai.Flow, "_find_triggered_methods") delattr(crewai, "_datadog_integration") diff --git a/ddtrace/contrib/internal/selenium/patch.py b/ddtrace/contrib/internal/selenium/patch.py index 869a17674b1..40dfb7dc6c3 100644 --- a/ddtrace/contrib/internal/selenium/patch.py +++ b/ddtrace/contrib/internal/selenium/patch.py @@ -1,7 +1,7 @@ import os import time -import typing as t from typing import Dict +import typing as t from wrapt.importer import when_imported diff --git a/tests/contrib/crewai/conftest.py b/tests/contrib/crewai/conftest.py index 71f503a459a..6e5ce7da14b 100644 --- a/tests/contrib/crewai/conftest.py +++ b/tests/contrib/crewai/conftest.py @@ -1,9 +1,14 @@ import os +import time from crewai import Agent from crewai import Crew +from crewai import Flow from crewai import Process from crewai import Task +from crewai.flow.flow import and_ +from crewai.flow.flow import listen +from crewai.flow.flow import start from crewai.tasks.conditional_task import ConditionalTask from crewai.tools import tool import pytest @@ -13,6 +18,10 @@ from ddtrace.contrib.internal.crewai.patch import unpatch from ddtrace.llmobs import LLMObs as llmobs_service from ddtrace.trace import Pin +from tests.contrib.crewai.utils import budget_text +from tests.contrib.crewai.utils import fun_fact_text +from tests.contrib.crewai.utils import itinerary_text +from tests.contrib.crewai.utils import welcome_email_text from tests.llmobs._utils import TestLLMObsSpanWriter from tests.utils import DummyTracer from tests.utils import DummyWriter @@ -159,6 +168,108 @@ def hierarchical_crew(crewai): ) +@pytest.fixture +def simple_flow(crewai): + class ExFlow(Flow[dict]): + model = "gpt-4o-mini" + + @start() + def generate_city(self): + time.sleep(0.05) + return "New York City" + + @listen(generate_city) + def generate_fun_fact(self, random_city): + time.sleep(0.06) + return fun_fact_text + + yield ExFlow() + + +@pytest.fixture +def simple_flow_async(crewai): + class ExFlow(Flow[dict]): + model = "gpt-4o-mini" + + @start() + async def generate_city(self): + time.sleep(0.05) + return "New York City" + + @listen(generate_city) + async def generate_fun_fact(self, random_city): + time.sleep(0.06) + return fun_fact_text + + yield ExFlow() + + +@pytest.fixture +def complex_flow(crewai): + class ExFlow(Flow[dict]): + model = "gpt-4o-mini" + + @start() + def generate_city(self): + time.sleep(0.05) + return "New York City" + + @start() + def generate_welcome_email(self): + time.sleep(0.05) + return welcome_email_text + + @listen(generate_city) + def generate_fun_fact(self, random_city): + time.sleep(0.06) + return fun_fact_text + + @listen(generate_city) + def generate_budget(self, random_city): + time.sleep(0.04) + return budget_text + + @listen(and_(generate_budget, generate_city, generate_fun_fact, generate_welcome_email)) + def generate_itinerary(self): + time.sleep(0.05) + return itinerary_text + + yield ExFlow() + + +@pytest.fixture +def complex_flow_async(crewai): + class ExFlow(Flow[dict]): + model = "gpt-4o-mini" + + @start() + async def generate_city(self): + time.sleep(0.05) + return "New York City" + + @start() + async def generate_welcome_email(self): + time.sleep(0.05) + return welcome_email_text + + @listen(generate_city) + async def generate_fun_fact(self, random_city): + time.sleep(0.06) + return fun_fact_text + + @listen(generate_city) + async def generate_budget(self, random_city): + time.sleep(0.04) + return budget_text + + @listen(and_(generate_budget, generate_city, generate_fun_fact, generate_welcome_email)) + async def generate_itinerary(self): + time.sleep(0.05) + return itinerary_text + + yield ExFlow() + + @pytest.fixture def crewai(monkeypatch): monkeypatch.setenv("OPENAI_API_KEY", "") diff --git a/tests/contrib/crewai/test_crewai.py b/tests/contrib/crewai/test_crewai.py index 589aa93cd2b..f9172b2b269 100644 --- a/tests/contrib/crewai/test_crewai.py +++ b/tests/contrib/crewai/test_crewai.py @@ -107,3 +107,23 @@ async def test_hierarchical_crew_async(crewai, hierarchical_crew, request_vcr): async def test_hierarchical_crew_async_for_each(crewai, hierarchical_crew, request_vcr): with request_vcr.use_cassette("test_hierarchical_crew.yaml"): await hierarchical_crew.kickoff_for_each_async(inputs=[{"ages": [10, 12, 14, 16, 18]}]) + + +@pytest.mark.snapshot(token="tests.contrib.crewai.test_crewai.test_simple_flow") +def test_simple_flow(crewai, simple_flow): + simple_flow.kickoff(inputs={"continent": "North America"}) + + +@pytest.mark.snapshot(token="tests.contrib.crewai.test_crewai.test_simple_flow") +async def test_simple_flow_async(crewai, simple_flow_async): + await simple_flow_async.kickoff_async(inputs={"continent": "North America"}) + + +@pytest.mark.snapshot(token="tests.contrib.crewai.test_crewai.test_complex_flow") +def test_complex_flow(crewai, complex_flow): + complex_flow.kickoff(inputs={"continent": "North America"}) + + +@pytest.mark.snapshot(token="tests.contrib.crewai.test_crewai.test_complex_flow") +async def test_complex_flow_async(crewai, complex_flow_async): + await complex_flow_async.kickoff_async(inputs={"continent": "North America"}) diff --git a/tests/contrib/crewai/test_crewai_llmobs.py b/tests/contrib/crewai/test_crewai_llmobs.py index 7ef83d00677..91d54fd4972 100644 --- a/tests/contrib/crewai/test_crewai_llmobs.py +++ b/tests/contrib/crewai/test_crewai_llmobs.py @@ -65,6 +65,14 @@ }, } +expected_span_args = { + "input_value": mock.ANY, + "output_value": mock.ANY, + "metadata": mock.ANY, + "tags": {"service": "tests.contrib.crewai", "ml_app": ""}, + "span_links": True, +} + def expected_agent_span_args(role): return { @@ -79,13 +87,6 @@ def expected_agent_span_args(role): def _assert_basic_crew_events(llmobs_events, spans): llmobs_events.sort(key=lambda span: span["start_ns"]) assert len(spans) == len(llmobs_events) == 5 - expected_span_args = { - "input_value": mock.ANY, - "output_value": mock.ANY, - "metadata": mock.ANY, - "tags": {"service": "tests.contrib.crewai", "ml_app": ""}, - "span_links": True, - } for llmobs_span, span, kind in zip(llmobs_events, spans, ("workflow", "task", "agent", "task", "agent")): extra_args = expected_agent_span_args(llmobs_span["name"]) if kind == "agent" else expected_span_args assert llmobs_span == _expected_llmobs_non_llm_span_event(span, span_kind=kind, **extra_args) @@ -108,13 +109,6 @@ def _assert_basic_crew_links(llmobs_events): def _assert_tool_crew_events(llmobs_events, spans): llmobs_events.sort(key=lambda span: span["start_ns"]) assert len(spans) == len(llmobs_events) == 4 - expected_span_args = { - "input_value": mock.ANY, - "output_value": mock.ANY, - "metadata": mock.ANY, - "tags": {"service": "tests.contrib.crewai", "ml_app": ""}, - "span_links": True, - } assert llmobs_events[0] == _expected_llmobs_non_llm_span_event(spans[0], span_kind="workflow", **expected_span_args) assert llmobs_events[1] == _expected_llmobs_non_llm_span_event(spans[1], span_kind="task", **expected_span_args) assert llmobs_events[2] == _expected_llmobs_non_llm_span_event( @@ -144,13 +138,6 @@ def _assert_tool_crew_links(llmobs_events): def _assert_async_crew_events(llmobs_events, spans): llmobs_events.sort(key=lambda span: span["start_ns"]) assert len(spans) == len(llmobs_events) == 6 - expected_span_args = { - "input_value": mock.ANY, - "output_value": mock.ANY, - "metadata": mock.ANY, - "tags": {"service": "tests.contrib.crewai", "ml_app": ""}, - "span_links": True, - } assert llmobs_events[0] == _expected_llmobs_non_llm_span_event(spans[0], span_kind="workflow", **expected_span_args) assert llmobs_events[1] == _expected_llmobs_non_llm_span_event(spans[1], span_kind="task", **expected_span_args) assert llmobs_events[2] == _expected_llmobs_non_llm_span_event( @@ -188,13 +175,6 @@ def _assert_async_crew_links(llmobs_events): def _assert_hierarchical_crew_events(llmobs_events, spans): llmobs_events.sort(key=lambda span: span["start_ns"]) assert len(spans) == len(llmobs_events) == 12 - expected_span_args = { - "input_value": mock.ANY, - "output_value": mock.ANY, - "metadata": mock.ANY, - "tags": {"service": "tests.contrib.crewai", "ml_app": ""}, - "span_links": True, - } expected_span_kinds = ( "workflow", "task", @@ -244,6 +224,47 @@ def _assert_hierarchical_crew_links(llmobs_events): _assert_span_link(llmobs_events[11], llmobs_events[10], "output", "output") +def _assert_simple_flow_events(llmobs_events, spans): + llmobs_events.sort(key=lambda span: span["start_ns"]) + assert len(spans) == len(llmobs_events) == 3 + assert llmobs_events[0] == _expected_llmobs_non_llm_span_event(spans[0], span_kind="workflow", **expected_span_args) + assert llmobs_events[1] == _expected_llmobs_non_llm_span_event(spans[1], span_kind="task", **expected_span_args) + assert llmobs_events[2] == _expected_llmobs_non_llm_span_event(spans[2], span_kind="task", **expected_span_args) + + +def _assert_simple_flow_links(llmobs_events): + llmobs_events.sort(key=lambda span: span["start_ns"]) + _assert_span_link(llmobs_events[0], llmobs_events[1], "input", "input") + _assert_span_link(llmobs_events[1], llmobs_events[2], "output", "input") + _assert_span_link(llmobs_events[2], llmobs_events[0], "output", "output") + + +def _assert_complex_flow_events(llmobs_events, spans): + llmobs_events.sort(key=lambda span: span["start_ns"]) + assert len(spans) == len(llmobs_events) == 6 + assert llmobs_events[0] == _expected_llmobs_non_llm_span_event(spans[0], span_kind="workflow", **expected_span_args) + assert llmobs_events[1] == _expected_llmobs_non_llm_span_event(spans[1], span_kind="task", **expected_span_args) + assert llmobs_events[2] == _expected_llmobs_non_llm_span_event(spans[2], span_kind="task", **expected_span_args) + assert llmobs_events[3] == _expected_llmobs_non_llm_span_event(spans[3], span_kind="task", **expected_span_args) + assert llmobs_events[4] == _expected_llmobs_non_llm_span_event(spans[4], span_kind="task", **expected_span_args) + assert llmobs_events[5] == _expected_llmobs_non_llm_span_event(spans[5], span_kind="task", **expected_span_args) + + +def _assert_complex_flow_links(llmobs_events): + llmobs_events.sort(key=lambda span: span["start_ns"]) + _assert_span_link(llmobs_events[0], llmobs_events[1], "input", "input") + _assert_span_link(llmobs_events[0], llmobs_events[2], "input", "input") + _assert_span_link(llmobs_events[5], llmobs_events[0], "output", "output") + + _assert_span_link(llmobs_events[1], llmobs_events[3], "output", "input") + _assert_span_link(llmobs_events[1], llmobs_events[4], "output", "input") + _assert_span_link(llmobs_events[1], llmobs_events[5], "output", "input") + + _assert_span_link(llmobs_events[2], llmobs_events[5], "output", "input") + _assert_span_link(llmobs_events[3], llmobs_events[5], "output", "input") + _assert_span_link(llmobs_events[4], llmobs_events[5], "output", "input") + + def test_basic_crew(crewai, basic_crew, request_vcr, mock_tracer, llmobs_events): with request_vcr.use_cassette("test_basic_crew.yaml"): basic_crew.kickoff(inputs={"topic": "AI"}) @@ -386,3 +407,35 @@ async def test_hierarchical_crew_async_for_each(crewai, hierarchical_crew, reque spans = mock_tracer.pop_traces()[0] _assert_hierarchical_crew_events(llmobs_events, spans) _assert_hierarchical_crew_links(llmobs_events) + + +def test_simple_flow(crewai, simple_flow, mock_tracer, llmobs_events): + simple_flow.kickoff(inputs={"continent": "North America"}) + spans = mock_tracer.pop_traces()[0] + assert len(spans) == 3 + _assert_simple_flow_events(llmobs_events, spans) + _assert_simple_flow_links(llmobs_events) + + +async def test_simple_flow_async(crewai, simple_flow_async, mock_tracer, llmobs_events): + await simple_flow_async.kickoff_async(inputs={"continent": "North America"}) + spans = mock_tracer.pop_traces()[0] + assert len(spans) == 3 + _assert_simple_flow_events(llmobs_events, spans) + _assert_simple_flow_links(llmobs_events) + + +def test_complex_flow(crewai, complex_flow, mock_tracer, llmobs_events): + complex_flow.kickoff(inputs={"continent": "North America"}) + spans = mock_tracer.pop_traces()[0] + assert len(spans) == 6 + _assert_complex_flow_events(llmobs_events, spans) + _assert_complex_flow_links(llmobs_events) + + +async def test_complex_flow_async(crewai, complex_flow_async, mock_tracer, llmobs_events): + await complex_flow_async.kickoff_async(inputs={"continent": "North America"}) + spans = mock_tracer.pop_traces()[0] + assert len(spans) == 6 + _assert_complex_flow_events(llmobs_events, spans) + _assert_complex_flow_links(llmobs_events) diff --git a/tests/contrib/crewai/utils.py b/tests/contrib/crewai/utils.py new file mode 100644 index 00000000000..816132ba8e1 --- /dev/null +++ b/tests/contrib/crewai/utils.py @@ -0,0 +1,100 @@ +welcome_email_text = """ +Subject: Welcome to [Your Company Name] – Let’s Start Planning Your Perfect Trip! + +Dear [Prospective Client's Name], + +We’re thrilled to welcome you to the [Your Company Name] family! Thank you for considering us for your upcoming trip. +We’re excited to help you create unforgettable memories tailored just for you. + +At [Your Company Name], we believe that every journey should be unique and special. +Our team of experienced travel planners is dedicated to understanding your preferences and crafting an itinerary that +perfectly aligns with your vision. Whether you’re dreaming of a relaxing beach getaway, an adventurous mountain +expedition, or an immersive cultural experience, we’re here to turn your dreams into reality. + +To get started, we’d love to learn more about your travel interests, dates, and any specific destinations you have in +mind. Please feel free to reply to this email or give us a call at [Your Phone Number]. + +Once again, welcome aboard! We look forward to helping you plan an amazing trip that exceeds your expectations. + +Warm regards, + +[Your Name] +[Your Position] +[Your Company Name] +[Your Phone Number] +[Your Email Address] +[Your Website URL] +""" + +fun_fact_text = """ +Sure! Did you know that New York City has its own secret underground city? +Below the bustling streets, there are abandoned subway stations, old tunnels, and even a hidden speakeasy! +One famous example is the City Hall subway station, which closed in 1945 but still showcases stunning architecture +and vintage designs. It’s occasionally open for tours, giving a glimpse into this secret part of NYC's history! +""" + +budget_text = """ +Trip Budget for New York City + +Accommodation (3 nights): $600 +Food (3 days): $150 +Transportation (subway/taxis): $100 +Attractions (museums, shows): $200 +Miscellaneous (shopping, souvenirs): $100 +Total Budget: $1,250 +""" + +itinerary_text = f""" +3-Day New York City Itinerary on a $1,250 Budget +Day 1: Arrival and Exploring Manhattan +Accommodation Check-in + +Stay: Budget hotel or hostel (around $200/night). Suggestions include HI New York City Hostel or pod 51 Hotel. +Morning: + +Breakfast: Grab a bagel and coffee at Ess-a-Bagel (Approx. $10). +Activity: Visit the iconic Central Park. Walk, take photos, and enjoy the scenery. Free! +Afternoon: + +Lunch: Delicious slices at Joe's Pizza — a NYC classic (Approx. $10). +Activity: Explore the MoMA. Admission is free on Fridays from 5:30 PM to 9 PM, so plan your visit for that time! +Evening: + +Dinner: Try Shake Shack for some burgers and shakes (Approx. $15). +Attraction: End your day with a stroll through Times Square. Immerse yourself in the lights and energy. Free! +Day 2: Brooklyn and More Cultural Experiences +Morning: + +Breakfast: Head to Balthazar Bakery for pastries and coffee (Approx. $15). +Activity: Walk across the Brooklyn Bridge for stunning views of the skyline. Free! +Afternoon: + +Lunch: Try Juliana's Pizza in Brooklyn (Approx. $20). +Activity: Explore DUMBO for its arts scene and waterfront parks. Free to explore! +Evening: + +Dinner: Feast on ethnic food at Smorgasburg Brooklyn (Approx. $20) if visiting on a Saturday. +Attraction: Catch a Broadway show. Approx. $70 for a matinee show. +Day 3: Culture and Farewell +Morning: + +Breakfast: Enjoy breakfast at Cafe Mogador (Approx. $15). +Activity: Visit the American Museum of Natural History (Admission is $23, but pay what you wish). +Afternoon: + +Lunch: Grab lunch at a street vendor or food truck (Approx. $10). +Activity: Explore Guggenheim Museum (Admission is $25, but check for free admission days). +Evening: + +Dinner: Treat yourself to dinner at The Halal Guys for delicious street food (Approx. $10). +Attraction: Take a walk through The High Line, a beautiful elevated park (Free!). + +Budget breakdown: +{budget_text} + +Fun fact: +{fun_fact_text} + +Welcome email: +{welcome_email_text} +""" diff --git a/tests/contrib/google_generativeai/conftest.py b/tests/contrib/google_generativeai/conftest.py index 5ad3f1d8955..5ab99b095d9 100644 --- a/tests/contrib/google_generativeai/conftest.py +++ b/tests/contrib/google_generativeai/conftest.py @@ -77,8 +77,8 @@ def genai(ddtrace_global_config, ddtrace_config_google_generativeai, mock_client dict(GOOGLE_GENERATIVEAI_API_KEY=os.getenv("GOOGLE_GENERATIVEAI_API_KEY", "")) ): patch() - import google.generativeai as genai from google.generativeai import client as client_lib + import google.generativeai as genai client_lib._client_manager.clients["generative"] = mock_client client_lib._client_manager.clients["generative_async"] = mock_client_async diff --git a/tests/contrib/langchain/test_langchain_patch.py b/tests/contrib/langchain/test_langchain_patch.py index 51e758918ed..4c41056dd57 100644 --- a/tests/contrib/langchain/test_langchain_patch.py +++ b/tests/contrib/langchain/test_langchain_patch.py @@ -48,9 +48,9 @@ def assert_module_patched(self, langchain): def assert_not_module_patched(self, langchain): try: - import langchain_community as gated_langchain from langchain_community import embeddings # noqa: F401 from langchain_community import vectorstores # noqa: F401 + import langchain_community as gated_langchain except ImportError: gated_langchain = None import langchain_core diff --git a/tests/snapshots/tests.contrib.crewai.test_crewai.test_complex_flow.json b/tests/snapshots/tests.contrib.crewai.test_crewai.test_complex_flow.json new file mode 100644 index 00000000000..2faa2eaf025 --- /dev/null +++ b/tests/snapshots/tests.contrib.crewai.test_crewai.test_complex_flow.json @@ -0,0 +1,101 @@ +[[ + { + "name": "ExFlow", + "service": "tests.contrib.crewai", + "resource": "CrewAI Flow", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "", + "error": 0, + "meta": { + "_dd.p.dm": "-0", + "_dd.p.tid": "688a751500000000", + "language": "python", + "runtime-id": "56162dabc987448fb1a8d3639a92aef8" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 34543 + }, + "duration": 275395000, + "start": 1753904405969657000 + }, + { + "name": "generate_city", + "service": "tests.contrib.crewai", + "resource": "CrewAI Flow Method", + "trace_id": 0, + "span_id": 2, + "parent_id": 1, + "type": "", + "error": 0, + "metrics": { + "_dd.measured": 1 + }, + "duration": 56131000, + "start": 1753904405969837000 + }, + { + "name": "generate_welcome_email", + "service": "tests.contrib.crewai", + "resource": "CrewAI Flow Method", + "trace_id": 0, + "span_id": 3, + "parent_id": 1, + "type": "", + "error": 0, + "metrics": { + "_dd.measured": 1 + }, + "duration": 51731000, + "start": 1753904406026550000 + }, + { + "name": "generate_fun_fact", + "service": "tests.contrib.crewai", + "resource": "CrewAI Flow Method", + "trace_id": 0, + "span_id": 4, + "parent_id": 1, + "type": "", + "error": 0, + "metrics": { + "_dd.measured": 1 + }, + "duration": 67431000, + "start": 1753904406078962000 + }, + { + "name": "generate_budget", + "service": "tests.contrib.crewai", + "resource": "CrewAI Flow Method", + "trace_id": 0, + "span_id": 5, + "parent_id": 1, + "type": "", + "error": 0, + "metrics": { + "_dd.measured": 1 + }, + "duration": 42643000, + "start": 1753904406146863000 + }, + { + "name": "generate_itinerary", + "service": "tests.contrib.crewai", + "resource": "CrewAI Flow Method", + "trace_id": 0, + "span_id": 6, + "parent_id": 1, + "type": "", + "error": 0, + "metrics": { + "_dd.measured": 1 + }, + "duration": 53739000, + "start": 1753904406190596000 + }]] diff --git a/tests/snapshots/tests.contrib.crewai.test_crewai.test_simple_flow.json b/tests/snapshots/tests.contrib.crewai.test_crewai.test_simple_flow.json new file mode 100644 index 00000000000..5bd14c7ff9d --- /dev/null +++ b/tests/snapshots/tests.contrib.crewai.test_crewai.test_simple_flow.json @@ -0,0 +1,56 @@ +[[ + { + "name": "ExFlow", + "service": "tests.contrib.crewai", + "resource": "CrewAI Flow", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "", + "error": 0, + "meta": { + "_dd.p.dm": "-0", + "_dd.p.tid": "688a6cac00000000", + "language": "python", + "runtime-id": "5dd5230615214af6a668a6124e22e4b4" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 26333 + }, + "duration": 120768000, + "start": 1753902252890209000 + }, + { + "name": "generate_city", + "service": "tests.contrib.crewai", + "resource": "CrewAI Flow Method", + "trace_id": 0, + "span_id": 2, + "parent_id": 1, + "type": "", + "error": 0, + "metrics": { + "_dd.measured": 1 + }, + "duration": 55178000, + "start": 1753902252890303000 + }, + { + "name": "generate_fun_fact", + "service": "tests.contrib.crewai", + "resource": "CrewAI Flow Method", + "trace_id": 0, + "span_id": 3, + "parent_id": 1, + "type": "", + "error": 0, + "metrics": { + "_dd.measured": 1 + }, + "duration": 65128000, + "start": 1753902252945642000 + }]] From 10e81c96e2b7028c966b3fbbcaca8bb66b352992 Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Wed, 30 Jul 2025 16:18:30 -0400 Subject: [PATCH 05/15] Add initial state for each flow method --- ddtrace/contrib/internal/crewai/patch.py | 2 ++ ddtrace/llmobs/_integrations/crewai.py | 7 ++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/ddtrace/contrib/internal/crewai/patch.py b/ddtrace/contrib/internal/crewai/patch.py index 9a645429166..a23ea46e57b 100644 --- a/ddtrace/contrib/internal/crewai/patch.py +++ b/ddtrace/contrib/internal/crewai/patch.py @@ -149,8 +149,10 @@ async def traced_flow_method(crewai, pin, func, instance, args, kwargs): with integration.trace( pin, "CrewAI Flow Method", span_name=span_name, operation="flow_method", submit_to_llmobs=True ) as span: + initial_flow_state = {**getattr(instance, "state", {})} result = await func(*args, **kwargs) kwargs["_dd.instance"] = instance + kwargs["_dd.flow_state"] = initial_flow_state integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=result, operation="flow_method") return result diff --git a/ddtrace/llmobs/_integrations/crewai.py b/ddtrace/llmobs/_integrations/crewai.py index 1a1b5ee9155..08d978b023c 100644 --- a/ddtrace/llmobs/_integrations/crewai.py +++ b/ddtrace/llmobs/_integrations/crewai.py @@ -279,7 +279,12 @@ def _llmobs_set_tags_flow(self, span, args, kwargs, response): def _llmobs_set_tags_flow_method(self, span, args, kwargs, response): flow_instance = kwargs.pop("_dd.instance", None) - input_dict = {"args": [str(arg) for arg in args[2:]], "kwargs": {k: str(v) for k, v in kwargs.items()}} + initial_flow_state = kwargs.pop("_dd.flow_state", {}) + input_dict = { + "args": [str(arg) for arg in args[2:]], + "kwargs": {k: str(v) for k, v in kwargs.items()}, + "flow_state": initial_flow_state, + } span_links = ( self._flow_span_to_method_to_span_dict.get(span._parent, {}).get(span.name, {}).get("span_links", []) ) From 2d65ba72b01b00007a95afe7cfb5131cf21eb76d Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Wed, 30 Jul 2025 16:21:29 -0400 Subject: [PATCH 06/15] Add reno --- releasenotes/notes/feat-crewai-flow-d5cb250484f1d3c1.yaml | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 releasenotes/notes/feat-crewai-flow-d5cb250484f1d3c1.yaml diff --git a/releasenotes/notes/feat-crewai-flow-d5cb250484f1d3c1.yaml b/releasenotes/notes/feat-crewai-flow-d5cb250484f1d3c1.yaml new file mode 100644 index 00000000000..45f741d2492 --- /dev/null +++ b/releasenotes/notes/feat-crewai-flow-d5cb250484f1d3c1.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + crewai: Introduces APM and LLM Observability tracing support for CrewAI Flow ``kickoff/kickoff_async`` calls, including tracing internal flow method execution. From ac26d2528751aead47ce4e89315f4171293b9b1a Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Wed, 30 Jul 2025 16:22:53 -0400 Subject: [PATCH 07/15] Revert unrelated fmt changes --- ddtrace/contrib/internal/selenium/patch.py | 2 +- tests/contrib/google_generativeai/conftest.py | 2 +- tests/contrib/langchain/test_langchain_patch.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ddtrace/contrib/internal/selenium/patch.py b/ddtrace/contrib/internal/selenium/patch.py index 40dfb7dc6c3..869a17674b1 100644 --- a/ddtrace/contrib/internal/selenium/patch.py +++ b/ddtrace/contrib/internal/selenium/patch.py @@ -1,7 +1,7 @@ import os import time -from typing import Dict import typing as t +from typing import Dict from wrapt.importer import when_imported diff --git a/tests/contrib/google_generativeai/conftest.py b/tests/contrib/google_generativeai/conftest.py index 5ab99b095d9..5ad3f1d8955 100644 --- a/tests/contrib/google_generativeai/conftest.py +++ b/tests/contrib/google_generativeai/conftest.py @@ -77,8 +77,8 @@ def genai(ddtrace_global_config, ddtrace_config_google_generativeai, mock_client dict(GOOGLE_GENERATIVEAI_API_KEY=os.getenv("GOOGLE_GENERATIVEAI_API_KEY", "")) ): patch() - from google.generativeai import client as client_lib import google.generativeai as genai + from google.generativeai import client as client_lib client_lib._client_manager.clients["generative"] = mock_client client_lib._client_manager.clients["generative_async"] = mock_client_async diff --git a/tests/contrib/langchain/test_langchain_patch.py b/tests/contrib/langchain/test_langchain_patch.py index 4c41056dd57..51e758918ed 100644 --- a/tests/contrib/langchain/test_langchain_patch.py +++ b/tests/contrib/langchain/test_langchain_patch.py @@ -48,9 +48,9 @@ def assert_module_patched(self, langchain): def assert_not_module_patched(self, langchain): try: + import langchain_community as gated_langchain from langchain_community import embeddings # noqa: F401 from langchain_community import vectorstores # noqa: F401 - import langchain_community as gated_langchain except ImportError: gated_langchain = None import langchain_core From d898872ae948b49c2b4a956f0f09b4302ddc6d1f Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Mon, 4 Aug 2025 14:03:23 -0400 Subject: [PATCH 08/15] Make routers (and AND conditions) work again --- ddtrace/contrib/internal/crewai/patch.py | 13 +++-- ddtrace/llmobs/_integrations/crewai.py | 64 +++++++++++++++++---- tests/contrib/crewai/conftest.py | 65 ++++++++++++++++++++++ tests/contrib/crewai/test_crewai_llmobs.py | 35 ++++++++++-- 4 files changed, 156 insertions(+), 21 deletions(-) diff --git a/ddtrace/contrib/internal/crewai/patch.py b/ddtrace/contrib/internal/crewai/patch.py index a23ea46e57b..f228fc4c07a 100644 --- a/ddtrace/contrib/internal/crewai/patch.py +++ b/ddtrace/contrib/internal/crewai/patch.py @@ -147,7 +147,12 @@ async def traced_flow_method(crewai, pin, func, instance, args, kwargs): integration = crewai._datadog_integration span_name = get_argument_value(args, kwargs, 0, "method_name", optional=True) or "Flow Method" with integration.trace( - pin, "CrewAI Flow Method", span_name=span_name, operation="flow_method", submit_to_llmobs=True + pin, + "CrewAI Flow Method", + span_name=span_name, + operation="flow_method", + submit_to_llmobs=True, + flow_instance=instance, ) as span: initial_flow_state = {**getattr(instance, "state", {})} result = await func(*args, **kwargs) @@ -161,10 +166,8 @@ async def traced_flow_method(crewai, pin, func, instance, args, kwargs): def patched_find_triggered_methods(crewai, pin, func, instance, args, kwargs): integration = crewai._datadog_integration result = func(*args, **kwargs) - if get_argument_value(args, kwargs, 1, "router_only", optional=True) is False: - # TODO: Work for routers too? - current_span = pin.tracer.current_span() - integration._llmobs_set_span_link_on_flow(current_span, args, kwargs, instance) + current_span = pin.tracer.current_span() + integration._llmobs_set_span_link_on_flow(current_span, args, kwargs, instance) return result diff --git a/ddtrace/llmobs/_integrations/crewai.py b/ddtrace/llmobs/_integrations/crewai.py index 08d978b023c..4a8f3d54e94 100644 --- a/ddtrace/llmobs/_integrations/crewai.py +++ b/ddtrace/llmobs/_integrations/crewai.py @@ -76,7 +76,9 @@ def trace(self, pin: Pin, operation_id: str, submit_to_llmobs: bool = False, **k if span._parent is None: return span span_dict = self._flow_span_to_method_to_span_dict.get(span._parent, {}).setdefault(method_name, {}) - span_dict["span_id"] = str(span.span_id) + is_router_method_name = span.name in getattr(kwargs.get("flow_instance"), "_routers", []) + # Need to differentiate between the same routers being stored by both their method name and result + span_dict.update({"span_id": str(span.span_id), "is_router_method_name": is_router_method_name}) return span def _get_current_ctx(self, pin): @@ -296,6 +298,12 @@ def _llmobs_set_tags_flow_method(self, span, args, kwargs, response): "attributes": {"from": "input", "to": "input"}, } ) + + if span.name in getattr(flow_instance, "_routers", []): + # Routers use their result (not method name) to link to the next method, so do the same for spans + span_dict = self._flow_span_to_method_to_span_dict.get(span._parent, {}).setdefault(str(response), {}) + span_dict.update({"span_id": str(span.span_id), "trace_id": format_trace_id(span.trace_id)}) + span._set_ctx_items( { NAME: span.name or "Flow Method", @@ -307,30 +315,62 @@ def _llmobs_set_tags_flow_method(self, span, args, kwargs, response): return def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance): - """Set span links for the next queued listener method(s) in a CrewAI flow.""" + """ + Set span links for the next queued listener method(s) in a CrewAI flow. + + Notes: + - Router methods passed by name are skipped (span links are based on router results) + - AND conditions: + - temporary output->output span links added by default for all trigger methods + - once all trigger methods have run for the listener, remove temporary output->output links and + add span links from trigger spans to listener span + """ trigger_method = get_argument_value(args, kwargs, 0, "trigger_method", optional=True) if not self.llmobs_enabled or not trigger_method: return - trigger_span_dict = self._flow_span_to_method_to_span_dict.get(flow_span, {}).get(trigger_method) - if not trigger_span_dict: + flow_methods_to_spans = self._flow_span_to_method_to_span_dict.get(flow_span, {}) + trigger_span_dict = flow_methods_to_spans.get(trigger_method) + if not trigger_span_dict or trigger_span_dict.get("is_router_method_name", False): return listeners = getattr(flow_instance, "_listeners", {}) triggered = False - # Check trigger method against each listener methods' triggers - for listener_name, (_, listener_triggers) in listeners.items(): + for listener_name, (condition_type, listener_triggers) in listeners.items(): if trigger_method not in listener_triggers: continue - triggered = True - span_dict = self._flow_span_to_method_to_span_dict.get(flow_span, {}).setdefault(listener_name, {}) + span_dict = flow_methods_to_spans.setdefault(listener_name, {}) span_dict["trace_id"] = format_trace_id(flow_span.trace_id) span_links = span_dict.setdefault("span_links", []) - span_links.append( - { + if condition_type != "AND": + triggered = True + span_links.append({ "span_id": str(trigger_span_dict["span_id"]), "trace_id": format_trace_id(flow_span.trace_id), "attributes": {"from": "output", "to": "input"}, - } - ) + }) + continue + if any( + flow_methods_to_spans.get(_trigger_method, {}).get("span_id") is None + for _trigger_method in listener_triggers + ): # skip if not all trigger methods have run (span ID must exist) for AND listener + continue + triggered = True + for method in listener_triggers: + method_span_dict = flow_methods_to_spans.get(method, {}) + if not method_span_dict: + continue + span_links.append( + { + "span_id": str(method_span_dict["span_id"]), + "trace_id": format_trace_id(flow_span.trace_id), + "attributes": {"from": "output", "to": "input"}, + } + ) + flow_span_span_links = flow_span._get_ctx_item(SPAN_LINKS) or [] + updated_links = [ + link for link in flow_span_span_links if link["span_id"] != str(method_span_dict["span_id"]) + ] + flow_span._set_ctx_item(SPAN_LINKS, updated_links) + if triggered is False: flow_span_span_links = flow_span._get_ctx_item(SPAN_LINKS) or [] flow_span_span_links.append( diff --git a/tests/contrib/crewai/conftest.py b/tests/contrib/crewai/conftest.py index 6e5ce7da14b..47b6d3b6bc6 100644 --- a/tests/contrib/crewai/conftest.py +++ b/tests/contrib/crewai/conftest.py @@ -8,6 +8,7 @@ from crewai import Task from crewai.flow.flow import and_ from crewai.flow.flow import listen +from crewai.flow.flow import router from crewai.flow.flow import start from crewai.tasks.conditional_task import ConditionalTask from crewai.tools import tool @@ -270,6 +271,70 @@ async def generate_itinerary(self): yield ExFlow() +@pytest.fixture +def router_flow(crewai): + class ExFlow(Flow[dict]): + model = "gpt-4o-mini" + + @start() + def generate_city(self): + time.sleep(0.05) + random_city = "New York City" + self.state["city"] = random_city + return random_city + + @router(generate_city) + def discriminate_city(self): + time.sleep(0.05) + if self.state["city"] != "New York City": + return "YIKES" + return "LFG" + + @listen("YIKES") + def say_oop(self): + time.sleep(0.03) + return "Oop, have a fun trip!" + + @listen("LFG") + def generate_fun_fact(self): + time.sleep(0.06) + return fun_fact_text + + yield ExFlow() + + +@pytest.fixture +def router_flow_async(crewai): + class ExFlow(Flow[dict]): + model = "gpt-4o-mini" + + @start() + async def generate_city(self): + time.sleep(0.05) + random_city = "New York City" + self.state["city"] = random_city + return random_city + + @router(generate_city) + async def discriminate_city(self): + time.sleep(0.05) + if self.state["city"] != "New York City": + return "YIKES" + return "LFG" + + @listen("YIKES") + async def say_oop(self): + time.sleep(0.03) + return "Oop, have a fun trip!" + + @listen("LFG") + async def generate_fun_fact(self): + time.sleep(0.06) + return fun_fact_text + + yield ExFlow() + + @pytest.fixture def crewai(monkeypatch): monkeypatch.setenv("OPENAI_API_KEY", "") diff --git a/tests/contrib/crewai/test_crewai_llmobs.py b/tests/contrib/crewai/test_crewai_llmobs.py index 91d54fd4972..8455218698f 100644 --- a/tests/contrib/crewai/test_crewai_llmobs.py +++ b/tests/contrib/crewai/test_crewai_llmobs.py @@ -265,6 +265,23 @@ def _assert_complex_flow_links(llmobs_events): _assert_span_link(llmobs_events[4], llmobs_events[5], "output", "input") +def _assert_router_flow_events(llmobs_events, spans): + llmobs_events.sort(key=lambda span: span["start_ns"]) + assert len(spans) == len(llmobs_events) == 4 + assert llmobs_events[0] == _expected_llmobs_non_llm_span_event(spans[0], span_kind="workflow", **expected_span_args) + assert llmobs_events[1] == _expected_llmobs_non_llm_span_event(spans[1], span_kind="task", **expected_span_args) + assert llmobs_events[2] == _expected_llmobs_non_llm_span_event(spans[2], span_kind="task", **expected_span_args) + assert llmobs_events[3] == _expected_llmobs_non_llm_span_event(spans[3], span_kind="task", **expected_span_args) + + +def _assert_router_flow_links(llmobs_events): + llmobs_events.sort(key=lambda span: span["start_ns"]) + _assert_span_link(llmobs_events[0], llmobs_events[1], "input", "input") + _assert_span_link(llmobs_events[1], llmobs_events[2], "output", "input") + _assert_span_link(llmobs_events[2], llmobs_events[3], "output", "input") + _assert_span_link(llmobs_events[3], llmobs_events[0], "output", "output") + + def test_basic_crew(crewai, basic_crew, request_vcr, mock_tracer, llmobs_events): with request_vcr.use_cassette("test_basic_crew.yaml"): basic_crew.kickoff(inputs={"topic": "AI"}) @@ -412,7 +429,6 @@ async def test_hierarchical_crew_async_for_each(crewai, hierarchical_crew, reque def test_simple_flow(crewai, simple_flow, mock_tracer, llmobs_events): simple_flow.kickoff(inputs={"continent": "North America"}) spans = mock_tracer.pop_traces()[0] - assert len(spans) == 3 _assert_simple_flow_events(llmobs_events, spans) _assert_simple_flow_links(llmobs_events) @@ -420,7 +436,6 @@ def test_simple_flow(crewai, simple_flow, mock_tracer, llmobs_events): async def test_simple_flow_async(crewai, simple_flow_async, mock_tracer, llmobs_events): await simple_flow_async.kickoff_async(inputs={"continent": "North America"}) spans = mock_tracer.pop_traces()[0] - assert len(spans) == 3 _assert_simple_flow_events(llmobs_events, spans) _assert_simple_flow_links(llmobs_events) @@ -428,7 +443,6 @@ async def test_simple_flow_async(crewai, simple_flow_async, mock_tracer, llmobs_ def test_complex_flow(crewai, complex_flow, mock_tracer, llmobs_events): complex_flow.kickoff(inputs={"continent": "North America"}) spans = mock_tracer.pop_traces()[0] - assert len(spans) == 6 _assert_complex_flow_events(llmobs_events, spans) _assert_complex_flow_links(llmobs_events) @@ -436,6 +450,19 @@ def test_complex_flow(crewai, complex_flow, mock_tracer, llmobs_events): async def test_complex_flow_async(crewai, complex_flow_async, mock_tracer, llmobs_events): await complex_flow_async.kickoff_async(inputs={"continent": "North America"}) spans = mock_tracer.pop_traces()[0] - assert len(spans) == 6 _assert_complex_flow_events(llmobs_events, spans) _assert_complex_flow_links(llmobs_events) + + +def test_router_flow(crewai, router_flow, mock_tracer, llmobs_events): + router_flow.kickoff() + spans = mock_tracer.pop_traces()[0] + _assert_router_flow_events(llmobs_events, spans) + _assert_router_flow_links(llmobs_events) + + +async def test_router_flow_async(crewai, router_flow_async, mock_tracer, llmobs_events): + await router_flow_async.kickoff_async() + spans = mock_tracer.pop_traces()[0] + _assert_router_flow_events(llmobs_events, spans) + _assert_router_flow_links(llmobs_events) From c86f22e253343490d24de6d69226f27b682eb703 Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Mon, 4 Aug 2025 16:21:54 -0400 Subject: [PATCH 09/15] fmt --- ddtrace/llmobs/_integrations/crewai.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/ddtrace/llmobs/_integrations/crewai.py b/ddtrace/llmobs/_integrations/crewai.py index 4a8f3d54e94..0326f500cfe 100644 --- a/ddtrace/llmobs/_integrations/crewai.py +++ b/ddtrace/llmobs/_integrations/crewai.py @@ -342,16 +342,18 @@ def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance): span_links = span_dict.setdefault("span_links", []) if condition_type != "AND": triggered = True - span_links.append({ - "span_id": str(trigger_span_dict["span_id"]), - "trace_id": format_trace_id(flow_span.trace_id), - "attributes": {"from": "output", "to": "input"}, - }) + span_links.append( + { + "span_id": str(trigger_span_dict["span_id"]), + "trace_id": format_trace_id(flow_span.trace_id), + "attributes": {"from": "output", "to": "input"}, + } + ) continue if any( flow_methods_to_spans.get(_trigger_method, {}).get("span_id") is None for _trigger_method in listener_triggers - ): # skip if not all trigger methods have run (span ID must exist) for AND listener + ): # skip if not all trigger methods have run (span ID must exist) for AND listener continue triggered = True for method in listener_triggers: From 0a55e954ddf0e4d6b138924f6de306ae2f33e282 Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Tue, 5 Aug 2025 12:45:35 -0400 Subject: [PATCH 10/15] Remove crewai from integrations all --- ddtrace/contrib/internal/crewai/patch.py | 2 +- ddtrace/llmobs/_integrations/__init__.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/ddtrace/contrib/internal/crewai/patch.py b/ddtrace/contrib/internal/crewai/patch.py index f228fc4c07a..40f58ab0fb2 100644 --- a/ddtrace/contrib/internal/crewai/patch.py +++ b/ddtrace/contrib/internal/crewai/patch.py @@ -8,7 +8,7 @@ from ddtrace.contrib.internal.trace_utils import with_traced_module from ddtrace.contrib.internal.trace_utils import wrap from ddtrace.internal.utils import get_argument_value -from ddtrace.llmobs._integrations import CrewAIIntegration +from ddtrace.llmobs._integrations.crewai import CrewAIIntegration from ddtrace.trace import Pin diff --git a/ddtrace/llmobs/_integrations/__init__.py b/ddtrace/llmobs/_integrations/__init__.py index 22b74a824e9..93ad8a9a62d 100644 --- a/ddtrace/llmobs/_integrations/__init__.py +++ b/ddtrace/llmobs/_integrations/__init__.py @@ -1,7 +1,6 @@ from .anthropic import AnthropicIntegration from .base import BaseLLMIntegration from .bedrock import BedrockIntegration -from .crewai import CrewAIIntegration from .gemini import GeminiIntegration from .google_genai import GoogleGenAIIntegration from .langchain import LangChainIntegration @@ -15,7 +14,6 @@ "AnthropicIntegration", "BaseLLMIntegration", "BedrockIntegration", - "CrewAIIntegration", "GeminiIntegration", "GoogleGenAIIntegration", "LangChainIntegration", From 157c9ae699f59dac87366447429f6f2ebbac9875 Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Thu, 7 Aug 2025 18:47:53 -0400 Subject: [PATCH 11/15] Address comments --- ddtrace/contrib/internal/crewai/patch.py | 31 +++++++++++++----------- ddtrace/llmobs/_integrations/crewai.py | 31 +++++++++++++++--------- 2 files changed, 37 insertions(+), 25 deletions(-) diff --git a/ddtrace/contrib/internal/crewai/patch.py b/ddtrace/contrib/internal/crewai/patch.py index 40f58ab0fb2..e08e0159b9c 100644 --- a/ddtrace/contrib/internal/crewai/patch.py +++ b/ddtrace/contrib/internal/crewai/patch.py @@ -25,7 +25,7 @@ def _supported_versions() -> Dict[str, str]: @with_traced_module def traced_kickoff(crewai, pin, func, instance, args, kwargs): - integration = crewai._datadog_integration + integration: CrewAIIntegration = crewai._datadog_integration result = None instance_id = getattr(instance, "id", "") planning_enabled = getattr(instance, "planning", False) @@ -52,7 +52,7 @@ def traced_kickoff(crewai, pin, func, instance, args, kwargs): @with_traced_module def traced_task_execute(crewai, pin, func, instance, args, kwargs): - integration = crewai._datadog_integration + integration: CrewAIIntegration = crewai._datadog_integration result = None span = integration.trace( pin, @@ -79,7 +79,7 @@ def traced_task_execute(crewai, pin, func, instance, args, kwargs): @with_traced_module def traced_task_execute_async(crewai, pin, func, instance, args, kwargs): - integration = crewai._datadog_integration + integration: CrewAIIntegration = crewai._datadog_integration _ddtrace_ctx = integration._get_current_ctx(pin) setattr(instance, "_ddtrace_ctx", _ddtrace_ctx) return func(*args, **kwargs) @@ -87,7 +87,7 @@ def traced_task_execute_async(crewai, pin, func, instance, args, kwargs): @with_traced_module def traced_task_get_context(crewai, pin, func, instance, args, kwargs): - integration = crewai._datadog_integration + integration: CrewAIIntegration = crewai._datadog_integration span = pin.tracer.current_span() result = func(*args, **kwargs) integration._llmobs_set_span_link_on_task(span, args, kwargs) @@ -96,7 +96,7 @@ def traced_task_get_context(crewai, pin, func, instance, args, kwargs): @with_traced_module def traced_agent_execute(crewai, pin, func, instance, args, kwargs): - integration = crewai._datadog_integration + integration: CrewAIIntegration = crewai._datadog_integration result = None span = integration.trace( pin, "CrewAI Agent", span_name=getattr(instance, "role", ""), operation="agent", submit_to_llmobs=True @@ -115,7 +115,7 @@ def traced_agent_execute(crewai, pin, func, instance, args, kwargs): @with_traced_module def traced_tool_run(crewai, pin, func, instance, args, kwargs): - integration = crewai._datadog_integration + integration: CrewAIIntegration = crewai._datadog_integration result = None span = integration.trace( pin, "CrewAI Tool", span_name=getattr(instance, "name", ""), operation="tool", submit_to_llmobs=True @@ -134,7 +134,7 @@ def traced_tool_run(crewai, pin, func, instance, args, kwargs): @with_traced_module async def traced_flow_kickoff(crewai, pin, func, instance, args, kwargs): - integration = crewai._datadog_integration + integration: CrewAIIntegration = crewai._datadog_integration span_name = getattr(type(instance), "__name__", "CrewAI Flow") with integration.trace(pin, "CrewAI Flow", span_name=span_name, operation="flow", submit_to_llmobs=True) as span: result = await func(*args, **kwargs) @@ -144,7 +144,7 @@ async def traced_flow_kickoff(crewai, pin, func, instance, args, kwargs): @with_traced_module async def traced_flow_method(crewai, pin, func, instance, args, kwargs): - integration = crewai._datadog_integration + integration: CrewAIIntegration = crewai._datadog_integration span_name = get_argument_value(args, kwargs, 0, "method_name", optional=True) or "Flow Method" with integration.trace( pin, @@ -164,7 +164,7 @@ async def traced_flow_method(crewai, pin, func, instance, args, kwargs): @with_traced_module def patched_find_triggered_methods(crewai, pin, func, instance, args, kwargs): - integration = crewai._datadog_integration + integration: CrewAIIntegration = crewai._datadog_integration result = func(*args, **kwargs) current_span = pin.tracer.current_span() integration._llmobs_set_span_link_on_flow(current_span, args, kwargs, instance) @@ -178,18 +178,21 @@ def patch(): crewai._datadog_patch = True Pin().onto(crewai) - integration = CrewAIIntegration(integration_config=config.crewai) + integration: CrewAIIntegration = CrewAIIntegration(integration_config=config.crewai) crewai._datadog_integration = integration wrap(crewai, "Crew.kickoff", traced_kickoff(crewai)) - wrap(crewai, "Crew._get_context", traced_task_get_context(crewai)) - wrap(crewai, "Task._execute_core", traced_task_execute(crewai)) wrap(crewai, "Task.execute_async", traced_task_execute_async(crewai)) wrap(crewai, "Agent.execute_task", traced_agent_execute(crewai)) wrap(crewai.tools.structured_tool, "CrewStructuredTool.invoke", traced_tool_run(crewai)) wrap(crewai, "Flow.kickoff_async", traced_flow_kickoff(crewai)) - wrap(crewai, "Flow._execute_method", traced_flow_method(crewai)) - wrap(crewai, "Flow._find_triggered_methods", patched_find_triggered_methods(crewai)) + try: # Safely attempt to patch private methods + wrap(crewai, "Crew._get_context", traced_task_get_context(crewai)) + wrap(crewai, "Task._execute_core", traced_task_execute(crewai)) + wrap(crewai, "Flow._execute_method", traced_flow_method(crewai)) + wrap(crewai, "Flow._find_triggered_methods", patched_find_triggered_methods(crewai)) + except AttributeError: + pass def unpatch(): diff --git a/ddtrace/llmobs/_integrations/crewai.py b/ddtrace/llmobs/_integrations/crewai.py index 0326f500cfe..ba62a6eee37 100644 --- a/ddtrace/llmobs/_integrations/crewai.py +++ b/ddtrace/llmobs/_integrations/crewai.py @@ -20,6 +20,7 @@ from ddtrace.llmobs._constants import SPAN_LINKS from ddtrace.llmobs._integrations.base import BaseLLMIntegration from ddtrace.llmobs._utils import _get_nearest_llmobs_ancestor +from ddtrace.llmobs._utils import safe_json from ddtrace.trace import Pin from ddtrace.trace import Span @@ -76,9 +77,7 @@ def trace(self, pin: Pin, operation_id: str, submit_to_llmobs: bool = False, **k if span._parent is None: return span span_dict = self._flow_span_to_method_to_span_dict.get(span._parent, {}).setdefault(method_name, {}) - is_router_method_name = span.name in getattr(kwargs.get("flow_instance"), "_routers", []) - # Need to differentiate between the same routers being stored by both their method name and result - span_dict.update({"span_id": str(span.span_id), "is_router_method_name": is_router_method_name}) + span_dict.update({"span_id": str(span.span_id)}) return span def _get_current_ctx(self, pin): @@ -284,7 +283,7 @@ def _llmobs_set_tags_flow_method(self, span, args, kwargs, response): initial_flow_state = kwargs.pop("_dd.flow_state", {}) input_dict = { "args": [str(arg) for arg in args[2:]], - "kwargs": {k: str(v) for k, v in kwargs.items()}, + "kwargs": {k: safe_json(v) for k, v in kwargs.items()}, "flow_state": initial_flow_state, } span_links = ( @@ -300,7 +299,8 @@ def _llmobs_set_tags_flow_method(self, span, args, kwargs, response): ) if span.name in getattr(flow_instance, "_routers", []): - # Routers use their result (not method name) to link to the next method, so do the same for spans + # For router methods the downstream trigger is the router's result, not the method name. + # We store the span info keyed by that result so it can be linked to future listener spans. span_dict = self._flow_span_to_method_to_span_dict.get(span._parent, {}).setdefault(str(response), {}) span_dict.update({"span_id": str(span.span_id), "trace_id": format_trace_id(span.trace_id)}) @@ -314,6 +314,14 @@ def _llmobs_set_tags_flow_method(self, span, args, kwargs, response): ) return + def llmobs_set_span_links_on_flow(self, flow_span, args, kwargs, flow_instance): + if not self.llmobs_enabled: + return + try: + self._llmobs_set_span_link_on_flow(flow_span, args, kwargs, flow_instance) + except Exception: + pass + def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance): """ Set span links for the next queued listener method(s) in a CrewAI flow. @@ -326,11 +334,13 @@ def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance): add span links from trigger spans to listener span """ trigger_method = get_argument_value(args, kwargs, 0, "trigger_method", optional=True) - if not self.llmobs_enabled or not trigger_method: + if not trigger_method: return flow_methods_to_spans = self._flow_span_to_method_to_span_dict.get(flow_span, {}) trigger_span_dict = flow_methods_to_spans.get(trigger_method) - if not trigger_span_dict or trigger_span_dict.get("is_router_method_name", False): + if not trigger_span_dict or trigger_method in getattr(flow_instance, "_routers", []): + # For router methods the downstream trigger is the router's result, not the method name + # Skip if trigger_method represents a router method name instead of the router's results return listeners = getattr(flow_instance, "_listeners", {}) triggered = False @@ -358,8 +368,6 @@ def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance): triggered = True for method in listener_triggers: method_span_dict = flow_methods_to_spans.get(method, {}) - if not method_span_dict: - continue span_links.append( { "span_id": str(method_span_dict["span_id"]), @@ -368,10 +376,11 @@ def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance): } ) flow_span_span_links = flow_span._get_ctx_item(SPAN_LINKS) or [] - updated_links = [ + # Remove temporary output->output link since the AND has been triggered + span_links_minus_tmp_output_links = [ link for link in flow_span_span_links if link["span_id"] != str(method_span_dict["span_id"]) ] - flow_span._set_ctx_item(SPAN_LINKS, updated_links) + flow_span._set_ctx_item(SPAN_LINKS, span_links_minus_tmp_output_links) if triggered is False: flow_span_span_links = flow_span._get_ctx_item(SPAN_LINKS) or [] From c18b8d203748222361c7b1090604ad50e4e27d58 Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Fri, 8 Aug 2025 10:49:33 -0400 Subject: [PATCH 12/15] specify error handling --- ddtrace/llmobs/_integrations/crewai.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/llmobs/_integrations/crewai.py b/ddtrace/llmobs/_integrations/crewai.py index ba62a6eee37..1b2f6493bd4 100644 --- a/ddtrace/llmobs/_integrations/crewai.py +++ b/ddtrace/llmobs/_integrations/crewai.py @@ -319,7 +319,7 @@ def llmobs_set_span_links_on_flow(self, flow_span, args, kwargs, flow_instance): return try: self._llmobs_set_span_link_on_flow(flow_span, args, kwargs, flow_instance) - except Exception: + except (KeyError, AttributeError): pass def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance): From 92dd013cf5badda2da1eae796207bd02580bbf99 Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Mon, 11 Aug 2025 17:38:38 -0400 Subject: [PATCH 13/15] Fix bugs, address comments, update testing versions --- .riot/requirements/181e571.txt | 166 ------------------ .riot/requirements/1ce4995.txt | 160 +++++++++++++++++ .../requirements/{16628a6.txt => 8b7e1b6.txt} | 144 ++++++++------- .../requirements/{158ac30.txt => e7249f1.txt} | 141 +++++++-------- ddtrace/contrib/internal/crewai/patch.py | 23 ++- ddtrace/llmobs/_integrations/crewai.py | 26 +-- riotfile.py | 2 +- tests/contrib/crewai/test_crewai_llmobs.py | 64 ++++--- 8 files changed, 372 insertions(+), 354 deletions(-) delete mode 100644 .riot/requirements/181e571.txt create mode 100644 .riot/requirements/1ce4995.txt rename .riot/requirements/{16628a6.txt => 8b7e1b6.txt} (51%) rename .riot/requirements/{158ac30.txt => e7249f1.txt} (52%) diff --git a/.riot/requirements/181e571.txt b/.riot/requirements/181e571.txt deleted file mode 100644 index ddf6bf380c2..00000000000 --- a/.riot/requirements/181e571.txt +++ /dev/null @@ -1,166 +0,0 @@ -# -# This file is autogenerated by pip-compile with Python 3.11 -# by the following command: -# -# pip-compile --allow-unsafe --no-annotate .riot/requirements/181e571.in -# -aiohappyeyeballs==2.6.1 -aiohttp==3.12.7 -aiosignal==1.3.2 -annotated-types==0.7.0 -anyio==4.9.0 -appdirs==1.4.4 -asgiref==3.8.1 -asttokens==3.0.0 -attrs==25.3.0 -auth0-python==4.9.0 -backoff==2.2.1 -bcrypt==4.3.0 -blinker==1.9.0 -build==1.2.2.post1 -cachetools==5.5.2 -certifi==2025.4.26 -cffi==1.17.1 -charset-normalizer==3.4.2 -chromadb==1.0.12 -click==8.2.1 -coloredlogs==15.0.1 -coverage[toml]==7.8.2 -crewai==0.121.1 -cryptography==45.0.3 -decorator==5.2.1 -deprecated==1.2.18 -distro==1.9.0 -docstring-parser==0.16 -durationpy==0.10 -et-xmlfile==2.0.0 -executing==2.2.0 -fastapi==0.115.9 -filelock==3.18.0 -flatbuffers==25.2.10 -frozenlist==1.6.0 -fsspec==2025.5.1 -google-auth==2.40.2 -googleapis-common-protos==1.70.0 -grpcio==1.72.1 -h11==0.16.0 -hf-xet==1.1.2 -httpcore==1.0.9 -httptools==0.6.4 -httpx==0.28.1 -huggingface-hub==0.32.4 -humanfriendly==10.0 -hypothesis==6.45.0 -idna==3.10 -importlib-metadata==8.6.1 -importlib-resources==6.5.2 -iniconfig==2.1.0 -instructor==1.8.3 -ipython==9.3.0 -ipython-pygments-lexers==1.1.1 -jedi==0.19.2 -jinja2==3.1.6 -jiter==0.8.2 -json-repair==0.46.0 -json5==0.12.0 -jsonpickle==4.1.1 -jsonref==1.1.0 -jsonschema==4.24.0 -jsonschema-specifications==2025.4.1 -kubernetes==32.0.1 -litellm==1.68.0 -markdown-it-py==3.0.0 -markupsafe==3.0.2 -matplotlib-inline==0.1.7 -mdurl==0.1.2 -mmh3==5.1.0 -mock==5.2.0 -mpmath==1.3.0 -multidict==6.4.4 -networkx==3.5 -numpy==2.2.6 -oauthlib==3.2.2 -onnxruntime==1.22.0 -openai==1.75.0 -openpyxl==3.1.5 -opentelemetry-api==1.33.1 -opentelemetry-exporter-otlp-proto-common==1.33.1 -opentelemetry-exporter-otlp-proto-grpc==1.33.1 -opentelemetry-exporter-otlp-proto-http==1.33.1 -opentelemetry-instrumentation==0.54b1 -opentelemetry-instrumentation-asgi==0.54b1 -opentelemetry-instrumentation-fastapi==0.54b1 -opentelemetry-proto==1.33.1 -opentelemetry-sdk==1.33.1 -opentelemetry-semantic-conventions==0.54b1 -opentelemetry-util-http==0.54b1 -opentracing==2.4.0 -orjson==3.10.18 -overrides==7.7.0 -packaging==25.0 -parso==0.8.4 -pdfminer-six==20250327 -pdfplumber==0.11.6 -pexpect==4.9.0 -pillow==11.2.1 -pluggy==1.6.0 -posthog==4.2.0 -prompt-toolkit==3.0.51 -propcache==0.3.1 -protobuf==5.29.5 -ptyprocess==0.7.0 -pure-eval==0.2.3 -pyasn1==0.6.1 -pyasn1-modules==0.4.2 -pycparser==2.22 -pydantic==2.11.5 -pydantic-core==2.33.2 -pygments==2.19.1 -pyjwt==2.10.1 -pypdfium2==4.30.1 -pypika==0.48.9 -pyproject-hooks==1.2.0 -pytest==8.4.0 -pytest-asyncio==1.0.0 -pytest-cov==6.1.1 -pytest-mock==3.14.1 -python-dateutil==2.9.0.post0 -python-dotenv==1.1.0 -pyvis==0.3.2 -pyyaml==6.0.2 -referencing==0.36.2 -regex==2024.11.6 -requests==2.32.3 -requests-oauthlib==2.0.0 -rich==13.9.4 -rpds-py==0.25.1 -rsa==4.9.1 -shellingham==1.5.4 -six==1.17.0 -sniffio==1.3.1 -sortedcontainers==2.4.0 -stack-data==0.6.3 -starlette==0.45.3 -sympy==1.14.0 -tenacity==9.1.2 -tiktoken==0.9.0 -tokenizers==0.21.1 -tomli==2.2.1 -tomli-w==1.2.0 -tqdm==4.67.1 -traitlets==5.14.3 -typer==0.16.0 -typing-extensions==4.14.0 -typing-inspection==0.4.1 -urllib3==2.4.0 -uv==0.7.9 -uvicorn[standard]==0.34.3 -uvloop==0.21.0 -vcrpy==7.0.0 -watchfiles==1.0.5 -wcwidth==0.2.13 -websocket-client==1.8.0 -websockets==15.0.1 -wrapt==1.17.2 -yarl==1.20.0 -zipp==3.22.0 diff --git a/.riot/requirements/1ce4995.txt b/.riot/requirements/1ce4995.txt new file mode 100644 index 00000000000..19a08990571 --- /dev/null +++ b/.riot/requirements/1ce4995.txt @@ -0,0 +1,160 @@ +# +# This file is autogenerated by pip-compile with Python 3.11 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate --resolver=backtracking .riot/requirements/1ce4995.in +# +aiohappyeyeballs==2.6.1 +aiohttp==3.12.15 +aiosignal==1.4.0 +annotated-types==0.7.0 +anyio==4.10.0 +appdirs==1.4.4 +asttokens==3.0.0 +attrs==25.3.0 +backoff==2.2.1 +bcrypt==4.3.0 +blinker==1.9.0 +build==1.3.0 +cachetools==5.5.2 +certifi==2025.8.3 +cffi==1.17.1 +charset-normalizer==3.4.3 +chromadb==1.0.16 +click==8.2.1 +coloredlogs==15.0.1 +coverage[toml]==7.10.3 +crewai==0.157.0 +cryptography==45.0.6 +decorator==5.2.1 +diskcache==5.6.3 +distro==1.9.0 +docstring-parser==0.17.0 +durationpy==0.10 +et-xmlfile==2.0.0 +executing==2.2.0 +filelock==3.18.0 +flatbuffers==25.2.10 +frozenlist==1.7.0 +fsspec==2025.7.0 +google-auth==2.40.3 +googleapis-common-protos==1.70.0 +grpcio==1.74.0 +h11==0.16.0 +hf-xet==1.1.7 +httpcore==1.0.9 +httptools==0.6.4 +httpx==0.28.1 +huggingface-hub==0.34.4 +humanfriendly==10.0 +hypothesis==6.45.0 +idna==3.10 +importlib-metadata==8.7.0 +importlib-resources==6.5.2 +iniconfig==2.1.0 +instructor==1.10.0 +ipython==9.4.0 +ipython-pygments-lexers==1.1.1 +jedi==0.19.2 +jinja2==3.1.6 +jiter==0.10.0 +json-repair==0.25.2 +json5==0.12.0 +jsonpickle==4.1.1 +jsonref==1.1.0 +jsonschema==4.25.0 +jsonschema-specifications==2025.4.1 +kubernetes==33.1.0 +litellm==1.74.9 +markdown-it-py==4.0.0 +markupsafe==3.0.2 +matplotlib-inline==0.1.7 +mdurl==0.1.2 +mmh3==5.2.0 +mock==5.2.0 +mpmath==1.3.0 +multidict==6.6.4 +networkx==3.5 +numpy==2.3.2 +oauthlib==3.3.1 +onnxruntime==1.22.0 +openai==1.99.8 +openpyxl==3.1.5 +opentelemetry-api==1.36.0 +opentelemetry-exporter-otlp-proto-common==1.36.0 +opentelemetry-exporter-otlp-proto-grpc==1.36.0 +opentelemetry-exporter-otlp-proto-http==1.36.0 +opentelemetry-proto==1.36.0 +opentelemetry-sdk==1.36.0 +opentelemetry-semantic-conventions==0.57b0 +opentracing==2.4.0 +orjson==3.11.1 +overrides==7.7.0 +packaging==25.0 +parso==0.8.4 +pdfminer-six==20250506 +pdfplumber==0.11.7 +pexpect==4.9.0 +pillow==11.3.0 +pluggy==1.6.0 +portalocker==2.7.0 +posthog==5.4.0 +prompt-toolkit==3.0.51 +propcache==0.3.2 +protobuf==6.31.1 +ptyprocess==0.7.0 +pure-eval==0.2.3 +pyasn1==0.6.1 +pyasn1-modules==0.4.2 +pybase64==1.4.2 +pycparser==2.22 +pydantic==2.11.7 +pydantic-core==2.33.2 +pygments==2.19.2 +pyjwt==2.10.1 +pypdfium2==4.30.0 +pypika==0.48.9 +pyproject-hooks==1.2.0 +pytest==8.4.1 +pytest-asyncio==1.1.0 +pytest-cov==6.2.1 +pytest-mock==3.14.1 +python-dateutil==2.9.0.post0 +python-dotenv==1.1.1 +pyvis==0.3.2 +pyyaml==6.0.2 +referencing==0.36.2 +regex==2025.7.34 +requests==2.32.4 +requests-oauthlib==2.0.0 +rich==14.1.0 +rpds-py==0.27.0 +rsa==4.9.1 +shellingham==1.5.4 +six==1.17.0 +sniffio==1.3.1 +sortedcontainers==2.4.0 +stack-data==0.6.3 +sympy==1.14.0 +tenacity==9.1.2 +tiktoken==0.11.0 +tokenizers==0.21.4 +tomli==2.2.1 +tomli-w==1.2.0 +tqdm==4.67.1 +traitlets==5.14.3 +typer==0.16.0 +typing-extensions==4.14.1 +typing-inspection==0.4.1 +urllib3==2.5.0 +uv==0.8.8 +uvicorn[standard]==0.35.0 +uvloop==0.21.0 +vcrpy==7.0.0 +watchfiles==1.1.0 +wcwidth==0.2.13 +websocket-client==1.8.0 +websockets==15.0.1 +wrapt==1.17.2 +yarl==1.20.1 +zipp==3.23.0 diff --git a/.riot/requirements/16628a6.txt b/.riot/requirements/8b7e1b6.txt similarity index 51% rename from .riot/requirements/16628a6.txt rename to .riot/requirements/8b7e1b6.txt index a570b0e1edf..58fb9cecab2 100644 --- a/.riot/requirements/16628a6.txt +++ b/.riot/requirements/8b7e1b6.txt @@ -2,165 +2,159 @@ # This file is autogenerated by pip-compile with Python 3.12 # by the following command: # -# pip-compile --allow-unsafe --no-annotate .riot/requirements/16628a6.in +# pip-compile --allow-unsafe --no-annotate .riot/requirements/8b7e1b6.in # aiohappyeyeballs==2.6.1 -aiohttp==3.12.7 -aiosignal==1.3.2 +aiohttp==3.12.15 +aiosignal==1.4.0 annotated-types==0.7.0 -anyio==4.9.0 +anyio==4.10.0 appdirs==1.4.4 -asgiref==3.8.1 asttokens==3.0.0 attrs==25.3.0 -auth0-python==4.9.0 backoff==2.2.1 bcrypt==4.3.0 blinker==1.9.0 -build==1.2.2.post1 +build==1.3.0 cachetools==5.5.2 -certifi==2025.4.26 +certifi==2025.8.3 cffi==1.17.1 -charset-normalizer==3.4.2 -chromadb==1.0.12 +charset-normalizer==3.4.3 +chromadb==1.0.16 click==8.2.1 coloredlogs==15.0.1 -coverage[toml]==7.8.2 -crewai==0.121.1 -cryptography==45.0.3 +coverage[toml]==7.10.3 +crewai==0.157.0 +cryptography==45.0.6 decorator==5.2.1 -deprecated==1.2.18 +diskcache==5.6.3 distro==1.9.0 -docstring-parser==0.16 +docstring-parser==0.17.0 durationpy==0.10 et-xmlfile==2.0.0 executing==2.2.0 -fastapi==0.115.9 filelock==3.18.0 flatbuffers==25.2.10 -frozenlist==1.6.0 -fsspec==2025.5.1 -google-auth==2.40.2 +frozenlist==1.7.0 +fsspec==2025.7.0 +google-auth==2.40.3 googleapis-common-protos==1.70.0 -grpcio==1.72.1 +grpcio==1.74.0 h11==0.16.0 -hf-xet==1.1.2 +hf-xet==1.1.7 httpcore==1.0.9 httptools==0.6.4 httpx==0.28.1 -huggingface-hub==0.32.4 +huggingface-hub==0.34.4 humanfriendly==10.0 hypothesis==6.45.0 idna==3.10 -importlib-metadata==8.6.1 +importlib-metadata==8.7.0 importlib-resources==6.5.2 iniconfig==2.1.0 -instructor==1.8.3 -ipython==9.3.0 +instructor==1.10.0 +ipython==9.4.0 ipython-pygments-lexers==1.1.1 jedi==0.19.2 jinja2==3.1.6 -jiter==0.8.2 -json-repair==0.46.0 +jiter==0.10.0 +json-repair==0.25.2 json5==0.12.0 jsonpickle==4.1.1 jsonref==1.1.0 -jsonschema==4.24.0 +jsonschema==4.25.0 jsonschema-specifications==2025.4.1 -kubernetes==32.0.1 -litellm==1.68.0 -markdown-it-py==3.0.0 +kubernetes==33.1.0 +litellm==1.74.9 +markdown-it-py==4.0.0 markupsafe==3.0.2 matplotlib-inline==0.1.7 mdurl==0.1.2 -mmh3==5.1.0 +mmh3==5.2.0 mock==5.2.0 mpmath==1.3.0 -multidict==6.4.4 +multidict==6.6.4 networkx==3.5 -numpy==2.2.6 -oauthlib==3.2.2 +numpy==2.3.2 +oauthlib==3.3.1 onnxruntime==1.22.0 -openai==1.75.0 +openai==1.99.8 openpyxl==3.1.5 -opentelemetry-api==1.33.1 -opentelemetry-exporter-otlp-proto-common==1.33.1 -opentelemetry-exporter-otlp-proto-grpc==1.33.1 -opentelemetry-exporter-otlp-proto-http==1.33.1 -opentelemetry-instrumentation==0.54b1 -opentelemetry-instrumentation-asgi==0.54b1 -opentelemetry-instrumentation-fastapi==0.54b1 -opentelemetry-proto==1.33.1 -opentelemetry-sdk==1.33.1 -opentelemetry-semantic-conventions==0.54b1 -opentelemetry-util-http==0.54b1 +opentelemetry-api==1.36.0 +opentelemetry-exporter-otlp-proto-common==1.36.0 +opentelemetry-exporter-otlp-proto-grpc==1.36.0 +opentelemetry-exporter-otlp-proto-http==1.36.0 +opentelemetry-proto==1.36.0 +opentelemetry-sdk==1.36.0 +opentelemetry-semantic-conventions==0.57b0 opentracing==2.4.0 -orjson==3.10.18 +orjson==3.11.1 overrides==7.7.0 packaging==25.0 parso==0.8.4 -pdfminer-six==20250327 -pdfplumber==0.11.6 +pdfminer-six==20250506 +pdfplumber==0.11.7 pexpect==4.9.0 -pillow==11.2.1 +pillow==11.3.0 pluggy==1.6.0 -posthog==4.2.0 +portalocker==2.7.0 +posthog==5.4.0 prompt-toolkit==3.0.51 -propcache==0.3.1 -protobuf==5.29.5 +propcache==0.3.2 +protobuf==6.31.1 ptyprocess==0.7.0 pure-eval==0.2.3 pyasn1==0.6.1 pyasn1-modules==0.4.2 +pybase64==1.4.2 pycparser==2.22 -pydantic==2.11.5 +pydantic==2.11.7 pydantic-core==2.33.2 -pygments==2.19.1 +pygments==2.19.2 pyjwt==2.10.1 -pypdfium2==4.30.1 +pypdfium2==4.30.0 pypika==0.48.9 pyproject-hooks==1.2.0 -pytest==8.4.0 -pytest-asyncio==1.0.0 -pytest-cov==6.1.1 +pytest==8.4.1 +pytest-asyncio==1.1.0 +pytest-cov==6.2.1 pytest-mock==3.14.1 python-dateutil==2.9.0.post0 -python-dotenv==1.1.0 +python-dotenv==1.1.1 pyvis==0.3.2 pyyaml==6.0.2 referencing==0.36.2 -regex==2024.11.6 -requests==2.32.3 +regex==2025.7.34 +requests==2.32.4 requests-oauthlib==2.0.0 -rich==13.9.4 -rpds-py==0.25.1 +rich==14.1.0 +rpds-py==0.27.0 rsa==4.9.1 shellingham==1.5.4 six==1.17.0 sniffio==1.3.1 sortedcontainers==2.4.0 stack-data==0.6.3 -starlette==0.45.3 sympy==1.14.0 tenacity==9.1.2 -tiktoken==0.9.0 -tokenizers==0.21.1 +tiktoken==0.11.0 +tokenizers==0.21.4 tomli==2.2.1 tomli-w==1.2.0 tqdm==4.67.1 traitlets==5.14.3 typer==0.16.0 -typing-extensions==4.14.0 +typing-extensions==4.14.1 typing-inspection==0.4.1 -urllib3==2.4.0 -uv==0.7.9 -uvicorn[standard]==0.34.3 +urllib3==2.5.0 +uv==0.8.8 +uvicorn[standard]==0.35.0 uvloop==0.21.0 vcrpy==7.0.0 -watchfiles==1.0.5 +watchfiles==1.1.0 wcwidth==0.2.13 websocket-client==1.8.0 websockets==15.0.1 wrapt==1.17.2 -yarl==1.20.0 -zipp==3.22.0 +yarl==1.20.1 +zipp==3.23.0 diff --git a/.riot/requirements/158ac30.txt b/.riot/requirements/e7249f1.txt similarity index 52% rename from .riot/requirements/158ac30.txt rename to .riot/requirements/e7249f1.txt index 39b978ecf98..88fc6f9b55e 100644 --- a/.riot/requirements/158ac30.txt +++ b/.riot/requirements/e7249f1.txt @@ -2,166 +2,161 @@ # This file is autogenerated by pip-compile with Python 3.10 # by the following command: # -# pip-compile --allow-unsafe --no-annotate .riot/requirements/158ac30.in +# pip-compile --allow-unsafe --no-annotate .riot/requirements/e7249f1.in # aiohappyeyeballs==2.6.1 -aiohttp==3.12.7 -aiosignal==1.3.2 +aiohttp==3.12.15 +aiosignal==1.4.0 annotated-types==0.7.0 -anyio==4.9.0 +anyio==4.10.0 appdirs==1.4.4 -asgiref==3.8.1 asttokens==3.0.0 async-timeout==5.0.1 attrs==25.3.0 -auth0-python==4.9.0 backoff==2.2.1 +backports-asyncio-runner==1.2.0 bcrypt==4.3.0 blinker==1.9.0 -build==1.2.2.post1 +build==1.3.0 cachetools==5.5.2 -certifi==2025.4.26 +certifi==2025.8.3 cffi==1.17.1 -charset-normalizer==3.4.2 -chromadb==1.0.12 +charset-normalizer==3.4.3 +chromadb==1.0.16 click==8.2.1 coloredlogs==15.0.1 -coverage[toml]==7.8.2 -crewai==0.121.1 -cryptography==45.0.3 +coverage[toml]==7.10.3 +crewai==0.157.0 +cryptography==45.0.6 decorator==5.2.1 -deprecated==1.2.18 +diskcache==5.6.3 distro==1.9.0 -docstring-parser==0.16 +docstring-parser==0.17.0 durationpy==0.10 et-xmlfile==2.0.0 exceptiongroup==1.3.0 executing==2.2.0 -fastapi==0.115.9 filelock==3.18.0 flatbuffers==25.2.10 -frozenlist==1.6.0 -fsspec==2025.5.1 -google-auth==2.40.2 +frozenlist==1.7.0 +fsspec==2025.7.0 +google-auth==2.40.3 googleapis-common-protos==1.70.0 -grpcio==1.72.1 +grpcio==1.74.0 h11==0.16.0 -hf-xet==1.1.2 +hf-xet==1.1.7 httpcore==1.0.9 httptools==0.6.4 httpx==0.28.1 -huggingface-hub==0.32.4 +huggingface-hub==0.34.4 humanfriendly==10.0 hypothesis==6.45.0 idna==3.10 -importlib-metadata==8.6.1 +importlib-metadata==8.7.0 importlib-resources==6.5.2 iniconfig==2.1.0 -instructor==1.8.3 +instructor==1.10.0 ipython==8.37.0 jedi==0.19.2 jinja2==3.1.6 -jiter==0.8.2 -json-repair==0.46.0 +jiter==0.10.0 +json-repair==0.25.2 json5==0.12.0 jsonpickle==4.1.1 jsonref==1.1.0 -jsonschema==4.24.0 +jsonschema==4.25.0 jsonschema-specifications==2025.4.1 -kubernetes==32.0.1 -litellm==1.68.0 -markdown-it-py==3.0.0 +kubernetes==33.1.0 +litellm==1.74.9 +markdown-it-py==4.0.0 markupsafe==3.0.2 matplotlib-inline==0.1.7 mdurl==0.1.2 -mmh3==5.1.0 +mmh3==5.2.0 mock==5.2.0 mpmath==1.3.0 -multidict==6.4.4 +multidict==6.6.4 networkx==3.4.2 numpy==2.2.6 -oauthlib==3.2.2 +oauthlib==3.3.1 onnxruntime==1.22.0 -openai==1.75.0 +openai==1.99.8 openpyxl==3.1.5 -opentelemetry-api==1.33.1 -opentelemetry-exporter-otlp-proto-common==1.33.1 -opentelemetry-exporter-otlp-proto-grpc==1.33.1 -opentelemetry-exporter-otlp-proto-http==1.33.1 -opentelemetry-instrumentation==0.54b1 -opentelemetry-instrumentation-asgi==0.54b1 -opentelemetry-instrumentation-fastapi==0.54b1 -opentelemetry-proto==1.33.1 -opentelemetry-sdk==1.33.1 -opentelemetry-semantic-conventions==0.54b1 -opentelemetry-util-http==0.54b1 +opentelemetry-api==1.36.0 +opentelemetry-exporter-otlp-proto-common==1.36.0 +opentelemetry-exporter-otlp-proto-grpc==1.36.0 +opentelemetry-exporter-otlp-proto-http==1.36.0 +opentelemetry-proto==1.36.0 +opentelemetry-sdk==1.36.0 +opentelemetry-semantic-conventions==0.57b0 opentracing==2.4.0 -orjson==3.10.18 +orjson==3.11.1 overrides==7.7.0 packaging==25.0 parso==0.8.4 -pdfminer-six==20250327 -pdfplumber==0.11.6 +pdfminer-six==20250506 +pdfplumber==0.11.7 pexpect==4.9.0 -pillow==11.2.1 +pillow==11.3.0 pluggy==1.6.0 -posthog==4.2.0 +portalocker==2.7.0 +posthog==5.4.0 prompt-toolkit==3.0.51 -propcache==0.3.1 -protobuf==5.29.5 +propcache==0.3.2 +protobuf==6.31.1 ptyprocess==0.7.0 pure-eval==0.2.3 pyasn1==0.6.1 pyasn1-modules==0.4.2 +pybase64==1.4.2 pycparser==2.22 -pydantic==2.11.5 +pydantic==2.11.7 pydantic-core==2.33.2 -pygments==2.19.1 +pygments==2.19.2 pyjwt==2.10.1 -pypdfium2==4.30.1 +pypdfium2==4.30.0 pypika==0.48.9 pyproject-hooks==1.2.0 -pytest==8.4.0 -pytest-asyncio==1.0.0 -pytest-cov==6.1.1 +pytest==8.4.1 +pytest-asyncio==1.1.0 +pytest-cov==6.2.1 pytest-mock==3.14.1 python-dateutil==2.9.0.post0 -python-dotenv==1.1.0 +python-dotenv==1.1.1 pyvis==0.3.2 pyyaml==6.0.2 referencing==0.36.2 -regex==2024.11.6 -requests==2.32.3 +regex==2025.7.34 +requests==2.32.4 requests-oauthlib==2.0.0 -rich==13.9.4 -rpds-py==0.25.1 +rich==14.1.0 +rpds-py==0.27.0 rsa==4.9.1 shellingham==1.5.4 six==1.17.0 sniffio==1.3.1 sortedcontainers==2.4.0 stack-data==0.6.3 -starlette==0.45.3 sympy==1.14.0 tenacity==9.1.2 -tiktoken==0.9.0 -tokenizers==0.21.1 +tiktoken==0.11.0 +tokenizers==0.21.4 tomli==2.2.1 tomli-w==1.2.0 tqdm==4.67.1 traitlets==5.14.3 typer==0.16.0 -typing-extensions==4.14.0 +typing-extensions==4.14.1 typing-inspection==0.4.1 -urllib3==2.4.0 -uv==0.7.9 -uvicorn[standard]==0.34.3 +urllib3==2.5.0 +uv==0.8.8 +uvicorn[standard]==0.35.0 uvloop==0.21.0 vcrpy==7.0.0 -watchfiles==1.0.5 +watchfiles==1.1.0 wcwidth==0.2.13 websocket-client==1.8.0 websockets==15.0.1 wrapt==1.17.2 -yarl==1.20.0 -zipp==3.22.0 +yarl==1.20.1 +zipp==3.23.0 diff --git a/ddtrace/contrib/internal/crewai/patch.py b/ddtrace/contrib/internal/crewai/patch.py index e08e0159b9c..dff3535ad57 100644 --- a/ddtrace/contrib/internal/crewai/patch.py +++ b/ddtrace/contrib/internal/crewai/patch.py @@ -7,6 +7,7 @@ from ddtrace.contrib.internal.trace_utils import unwrap from ddtrace.contrib.internal.trace_utils import with_traced_module from ddtrace.contrib.internal.trace_utils import wrap +from ddtrace.internal.logger import get_logger from ddtrace.internal.utils import get_argument_value from ddtrace.llmobs._integrations.crewai import CrewAIIntegration from ddtrace.trace import Pin @@ -16,6 +17,9 @@ def get_version() -> str: return getattr(crewai, "__version__", "") +logger = get_logger(__name__) + + config._add("crewai", {}) @@ -154,7 +158,7 @@ async def traced_flow_method(crewai, pin, func, instance, args, kwargs): submit_to_llmobs=True, flow_instance=instance, ) as span: - initial_flow_state = {**getattr(instance, "state", {})} + initial_flow_state = getattr(instance, "state", {}) result = await func(*args, **kwargs) kwargs["_dd.instance"] = instance kwargs["_dd.flow_state"] = initial_flow_state @@ -167,7 +171,7 @@ def patched_find_triggered_methods(crewai, pin, func, instance, args, kwargs): integration: CrewAIIntegration = crewai._datadog_integration result = func(*args, **kwargs) current_span = pin.tracer.current_span() - integration._llmobs_set_span_link_on_flow(current_span, args, kwargs, instance) + integration.llmobs_set_span_links_on_flow(current_span, args, kwargs, instance) return result @@ -186,13 +190,13 @@ def patch(): wrap(crewai, "Agent.execute_task", traced_agent_execute(crewai)) wrap(crewai.tools.structured_tool, "CrewStructuredTool.invoke", traced_tool_run(crewai)) wrap(crewai, "Flow.kickoff_async", traced_flow_kickoff(crewai)) - try: # Safely attempt to patch private methods + try: wrap(crewai, "Crew._get_context", traced_task_get_context(crewai)) wrap(crewai, "Task._execute_core", traced_task_execute(crewai)) wrap(crewai, "Flow._execute_method", traced_flow_method(crewai)) wrap(crewai, "Flow._find_triggered_methods", patched_find_triggered_methods(crewai)) except AttributeError: - pass + logger.warning("Failed to patch internal CrewAI methods.") def unpatch(): @@ -202,13 +206,16 @@ def unpatch(): crewai._datadog_patch = False unwrap(crewai.Crew, "kickoff") - unwrap(crewai.Crew, "_get_context") - unwrap(crewai.Task, "_execute_core") unwrap(crewai.Task, "execute_async") unwrap(crewai.Agent, "execute_task") unwrap(crewai.tools.structured_tool.CrewStructuredTool, "invoke") unwrap(crewai.Flow, "kickoff_async") - unwrap(crewai.Flow, "_execute_method") - unwrap(crewai.Flow, "_find_triggered_methods") + try: + unwrap(crewai.Crew, "_get_context") + unwrap(crewai.Task, "_execute_core") + unwrap(crewai.Flow, "_execute_method") + unwrap(crewai.Flow, "_find_triggered_methods") + except AttributeError: + pass delattr(crewai, "_datadog_integration") diff --git a/ddtrace/llmobs/_integrations/crewai.py b/ddtrace/llmobs/_integrations/crewai.py index 1b2f6493bd4..c36683dd476 100644 --- a/ddtrace/llmobs/_integrations/crewai.py +++ b/ddtrace/llmobs/_integrations/crewai.py @@ -280,16 +280,22 @@ def _llmobs_set_tags_flow(self, span, args, kwargs, response): def _llmobs_set_tags_flow_method(self, span, args, kwargs, response): flow_instance = kwargs.pop("_dd.instance", None) - initial_flow_state = kwargs.pop("_dd.flow_state", {}) + state_value = kwargs.pop("_dd.flow_state", {}) + initial_flow_state = {} + if isinstance(state_value, dict): + initial_flow_state = state_value + elif hasattr(state_value, "model_dump"): + initial_flow_state = state_value.model_dump() + input_dict = { - "args": [str(arg) for arg in args[2:]], + "args": [safe_json(arg) for arg in args[2:]], "kwargs": {k: safe_json(v) for k, v in kwargs.items()}, "flow_state": initial_flow_state, } span_links = ( self._flow_span_to_method_to_span_dict.get(span._parent, {}).get(span.name, {}).get("span_links", []) ) - if span.name in getattr(flow_instance, "_start_methods", []): + if span.name in getattr(flow_instance, "_start_methods", []) and span._parent is not None: span_links.append( { "span_id": str(span._parent.span_id), @@ -327,11 +333,12 @@ def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance): Set span links for the next queued listener method(s) in a CrewAI flow. Notes: - - Router methods passed by name are skipped (span links are based on router results) - - AND conditions: - - temporary output->output span links added by default for all trigger methods - - once all trigger methods have run for the listener, remove temporary output->output links and - add span links from trigger spans to listener span + - trigger_method is either a method name or router result, which trigger normal/router listeners respectively. + We skip if trigger_method is a router method name because we use the router result to link triggered listeners + - AND conditions: + - temporary output->output span links added by default for all trigger methods + - once all trigger methods have run for the listener, remove temporary output->output links and + add span links from trigger spans to listener span """ trigger_method = get_argument_value(args, kwargs, 0, "trigger_method", optional=True) if not trigger_method: @@ -339,7 +346,7 @@ def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance): flow_methods_to_spans = self._flow_span_to_method_to_span_dict.get(flow_span, {}) trigger_span_dict = flow_methods_to_spans.get(trigger_method) if not trigger_span_dict or trigger_method in getattr(flow_instance, "_routers", []): - # For router methods the downstream trigger is the router's result, not the method name + # For router methods the downstream trigger listens for the router's result, not the router method name # Skip if trigger_method represents a router method name instead of the router's results return listeners = getattr(flow_instance, "_listeners", {}) @@ -348,7 +355,6 @@ def _llmobs_set_span_link_on_flow(self, flow_span, args, kwargs, flow_instance): if trigger_method not in listener_triggers: continue span_dict = flow_methods_to_spans.setdefault(listener_name, {}) - span_dict["trace_id"] = format_trace_id(flow_span.trace_id) span_links = span_dict.setdefault("span_links", []) if condition_type != "AND": triggered = True diff --git a/riotfile.py b/riotfile.py index 9e966ad4d73..4ec3dcdf6b2 100644 --- a/riotfile.py +++ b/riotfile.py @@ -2967,7 +2967,7 @@ def select_pys(min_version: str = MIN_PYTHON_VERSION, max_version: str = MAX_PYT pkgs={ "pytest-asyncio": latest, "openai": latest, - "crewai": ["~=0.102.0", "~=0.121.0"], + "crewai": ["~=0.102.0", latest], "vcrpy": "==7.0.0", }, ), diff --git a/tests/contrib/crewai/test_crewai_llmobs.py b/tests/contrib/crewai/test_crewai_llmobs.py index 8455218698f..40adc0e9474 100644 --- a/tests/contrib/crewai/test_crewai_llmobs.py +++ b/tests/contrib/crewai/test_crewai_llmobs.py @@ -1,9 +1,17 @@ +import json + +import crewai import mock +from ddtrace.internal.utils.version import parse_version +from tests.contrib.crewai.utils import fun_fact_text from tests.llmobs._utils import _assert_span_link from tests.llmobs._utils import _expected_llmobs_non_llm_span_event +CREWAI_VERSION = parse_version(getattr(crewai, "__version__", "0.0.0")) + + AGENT_TO_EXPECTED_AGENT_MANIFEST = { "Senior Research Scientist": { "framework": "CrewAI", @@ -65,7 +73,7 @@ }, } -expected_span_args = { +EXPECTED_SPAN_ARGS = { "input_value": mock.ANY, "output_value": mock.ANY, "metadata": mock.ANY, @@ -88,7 +96,7 @@ def _assert_basic_crew_events(llmobs_events, spans): llmobs_events.sort(key=lambda span: span["start_ns"]) assert len(spans) == len(llmobs_events) == 5 for llmobs_span, span, kind in zip(llmobs_events, spans, ("workflow", "task", "agent", "task", "agent")): - extra_args = expected_agent_span_args(llmobs_span["name"]) if kind == "agent" else expected_span_args + extra_args = expected_agent_span_args(llmobs_span["name"]) if kind == "agent" else EXPECTED_SPAN_ARGS assert llmobs_span == _expected_llmobs_non_llm_span_event(span, span_kind=kind, **extra_args) @@ -109,8 +117,8 @@ def _assert_basic_crew_links(llmobs_events): def _assert_tool_crew_events(llmobs_events, spans): llmobs_events.sort(key=lambda span: span["start_ns"]) assert len(spans) == len(llmobs_events) == 4 - assert llmobs_events[0] == _expected_llmobs_non_llm_span_event(spans[0], span_kind="workflow", **expected_span_args) - assert llmobs_events[1] == _expected_llmobs_non_llm_span_event(spans[1], span_kind="task", **expected_span_args) + assert llmobs_events[0] == _expected_llmobs_non_llm_span_event(spans[0], span_kind="workflow", **EXPECTED_SPAN_ARGS) + assert llmobs_events[1] == _expected_llmobs_non_llm_span_event(spans[1], span_kind="task", **EXPECTED_SPAN_ARGS) assert llmobs_events[2] == _expected_llmobs_non_llm_span_event( spans[2], span_kind="agent", **expected_agent_span_args(llmobs_events[2]["name"]) ) @@ -138,8 +146,8 @@ def _assert_tool_crew_links(llmobs_events): def _assert_async_crew_events(llmobs_events, spans): llmobs_events.sort(key=lambda span: span["start_ns"]) assert len(spans) == len(llmobs_events) == 6 - assert llmobs_events[0] == _expected_llmobs_non_llm_span_event(spans[0], span_kind="workflow", **expected_span_args) - assert llmobs_events[1] == _expected_llmobs_non_llm_span_event(spans[1], span_kind="task", **expected_span_args) + assert llmobs_events[0] == _expected_llmobs_non_llm_span_event(spans[0], span_kind="workflow", **EXPECTED_SPAN_ARGS) + assert llmobs_events[1] == _expected_llmobs_non_llm_span_event(spans[1], span_kind="task", **EXPECTED_SPAN_ARGS) assert llmobs_events[2] == _expected_llmobs_non_llm_span_event( spans[2], span_kind="agent", **expected_agent_span_args(llmobs_events[2]["name"]) ) @@ -151,7 +159,7 @@ def _assert_async_crew_events(llmobs_events, spans): metadata=mock.ANY, tags={"service": "tests.contrib.crewai", "ml_app": ""}, ) - assert llmobs_events[4] == _expected_llmobs_non_llm_span_event(spans[4], span_kind="task", **expected_span_args) + assert llmobs_events[4] == _expected_llmobs_non_llm_span_event(spans[4], span_kind="task", **EXPECTED_SPAN_ARGS) assert llmobs_events[5] == _expected_llmobs_non_llm_span_event( spans[5], span_kind="agent", **expected_agent_span_args(llmobs_events[5]["name"]) ) @@ -200,7 +208,7 @@ def _assert_hierarchical_crew_events(llmobs_events, spans): tags={"service": "tests.contrib.crewai", "ml_app": ""}, ) continue - assert llmobs_span == _expected_llmobs_non_llm_span_event(span, span_kind=kind, **expected_span_args) + assert llmobs_span == _expected_llmobs_non_llm_span_event(span, span_kind=kind, **EXPECTED_SPAN_ARGS) def _assert_hierarchical_crew_links(llmobs_events): @@ -227,9 +235,23 @@ def _assert_hierarchical_crew_links(llmobs_events): def _assert_simple_flow_events(llmobs_events, spans): llmobs_events.sort(key=lambda span: span["start_ns"]) assert len(spans) == len(llmobs_events) == 3 - assert llmobs_events[0] == _expected_llmobs_non_llm_span_event(spans[0], span_kind="workflow", **expected_span_args) - assert llmobs_events[1] == _expected_llmobs_non_llm_span_event(spans[1], span_kind="task", **expected_span_args) - assert llmobs_events[2] == _expected_llmobs_non_llm_span_event(spans[2], span_kind="task", **expected_span_args) + assert llmobs_events[0] == _expected_llmobs_non_llm_span_event(spans[0], span_kind="workflow", **EXPECTED_SPAN_ARGS) + assert llmobs_events[1] == _expected_llmobs_non_llm_span_event(spans[1], span_kind="task", **EXPECTED_SPAN_ARGS) + assert llmobs_events[1]["meta"]["output"]["value"] == "New York City" + assert llmobs_events[2] == _expected_llmobs_non_llm_span_event(spans[2], span_kind="task", **EXPECTED_SPAN_ARGS) + if CREWAI_VERSION >= (0, 119, 0): # Tracking I/O and state management only available CrewAI >=0.119.0 + input_val = json.loads(llmobs_events[0]["meta"]["input"]["value"]) + assert input_val == {"continent": "North America"} + assert llmobs_events[0]["meta"]["output"]["value"] == fun_fact_text + input_val = json.loads(llmobs_events[1]["meta"]["input"]["value"]) + assert input_val["args"] == [] + assert input_val["kwargs"] == {} + assert input_val["flow_state"] == {"id": mock.ANY, "continent": "North America"} + input_val = json.loads(llmobs_events[2]["meta"]["input"]["value"]) + assert input_val["args"] == ["New York City"] + assert input_val["kwargs"] == {} + assert input_val["flow_state"] == {"id": mock.ANY, "continent": "North America"} + assert llmobs_events[2]["meta"]["output"]["value"] == fun_fact_text def _assert_simple_flow_links(llmobs_events): @@ -242,12 +264,12 @@ def _assert_simple_flow_links(llmobs_events): def _assert_complex_flow_events(llmobs_events, spans): llmobs_events.sort(key=lambda span: span["start_ns"]) assert len(spans) == len(llmobs_events) == 6 - assert llmobs_events[0] == _expected_llmobs_non_llm_span_event(spans[0], span_kind="workflow", **expected_span_args) - assert llmobs_events[1] == _expected_llmobs_non_llm_span_event(spans[1], span_kind="task", **expected_span_args) - assert llmobs_events[2] == _expected_llmobs_non_llm_span_event(spans[2], span_kind="task", **expected_span_args) - assert llmobs_events[3] == _expected_llmobs_non_llm_span_event(spans[3], span_kind="task", **expected_span_args) - assert llmobs_events[4] == _expected_llmobs_non_llm_span_event(spans[4], span_kind="task", **expected_span_args) - assert llmobs_events[5] == _expected_llmobs_non_llm_span_event(spans[5], span_kind="task", **expected_span_args) + assert llmobs_events[0] == _expected_llmobs_non_llm_span_event(spans[0], span_kind="workflow", **EXPECTED_SPAN_ARGS) + assert llmobs_events[1] == _expected_llmobs_non_llm_span_event(spans[1], span_kind="task", **EXPECTED_SPAN_ARGS) + assert llmobs_events[2] == _expected_llmobs_non_llm_span_event(spans[2], span_kind="task", **EXPECTED_SPAN_ARGS) + assert llmobs_events[3] == _expected_llmobs_non_llm_span_event(spans[3], span_kind="task", **EXPECTED_SPAN_ARGS) + assert llmobs_events[4] == _expected_llmobs_non_llm_span_event(spans[4], span_kind="task", **EXPECTED_SPAN_ARGS) + assert llmobs_events[5] == _expected_llmobs_non_llm_span_event(spans[5], span_kind="task", **EXPECTED_SPAN_ARGS) def _assert_complex_flow_links(llmobs_events): @@ -268,10 +290,10 @@ def _assert_complex_flow_links(llmobs_events): def _assert_router_flow_events(llmobs_events, spans): llmobs_events.sort(key=lambda span: span["start_ns"]) assert len(spans) == len(llmobs_events) == 4 - assert llmobs_events[0] == _expected_llmobs_non_llm_span_event(spans[0], span_kind="workflow", **expected_span_args) - assert llmobs_events[1] == _expected_llmobs_non_llm_span_event(spans[1], span_kind="task", **expected_span_args) - assert llmobs_events[2] == _expected_llmobs_non_llm_span_event(spans[2], span_kind="task", **expected_span_args) - assert llmobs_events[3] == _expected_llmobs_non_llm_span_event(spans[3], span_kind="task", **expected_span_args) + assert llmobs_events[0] == _expected_llmobs_non_llm_span_event(spans[0], span_kind="workflow", **EXPECTED_SPAN_ARGS) + assert llmobs_events[1] == _expected_llmobs_non_llm_span_event(spans[1], span_kind="task", **EXPECTED_SPAN_ARGS) + assert llmobs_events[2] == _expected_llmobs_non_llm_span_event(spans[2], span_kind="task", **EXPECTED_SPAN_ARGS) + assert llmobs_events[3] == _expected_llmobs_non_llm_span_event(spans[3], span_kind="task", **EXPECTED_SPAN_ARGS) def _assert_router_flow_links(llmobs_events): From 811e6e682f4d0848ff91aa26d82686d511ebc66a Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Mon, 11 Aug 2025 17:41:04 -0400 Subject: [PATCH 14/15] Update tested versions --- ddtrace/contrib/integration_registry/registry.yaml | 2 +- supported_versions_output.json | 2 +- supported_versions_table.csv | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ddtrace/contrib/integration_registry/registry.yaml b/ddtrace/contrib/integration_registry/registry.yaml index 06f3c62a5d7..e62456aa477 100644 --- a/ddtrace/contrib/integration_registry/registry.yaml +++ b/ddtrace/contrib/integration_registry/registry.yaml @@ -243,7 +243,7 @@ integrations: tested_versions_by_dependency: crewai: min: 0.102.0 - max: 0.121.1 + max: 0.157.0 - integration_name: ddtrace_api is_external_package: false diff --git a/supported_versions_output.json b/supported_versions_output.json index 02f5611cfa0..df644f64c45 100644 --- a/supported_versions_output.json +++ b/supported_versions_output.json @@ -171,7 +171,7 @@ "dependency": "crewai", "integration": "crewai", "minimum_tracer_supported": "0.102.0", - "max_tracer_supported": "0.121.1", + "max_tracer_supported": "0.157.0", "pinned": "true", "auto-instrumented": true }, diff --git a/supported_versions_table.csv b/supported_versions_table.csv index edf6d76a144..c0d9588d470 100644 --- a/supported_versions_table.csv +++ b/supported_versions_table.csv @@ -22,7 +22,7 @@ cassandra-driver,cassandra,3.24.0,3.28.0,True celery,celery,5.3.6,5.4.0,True cherrypy,cherrypy,17.0.0,18.10.0,False python-consul,consul,1.1.0,1.1.0,True -crewai,crewai *,0.102.0,0.121.1,True +crewai,crewai *,0.102.0,0.157.0,True django,django,2.2.28,5.2,True dogpile-cache,dogpile_cache,0.6.8,1.3.3,True dogpile.cache,dogpile_cache,0.6.8,1.3.3,True From 0ddc3d95b12e0e3191c592ae1dc3747184dc4278 Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Tue, 12 Aug 2025 10:15:10 -0400 Subject: [PATCH 15/15] ensure initial flow state --- ddtrace/contrib/internal/crewai/patch.py | 9 +++++++-- ddtrace/llmobs/_integrations/crewai.py | 8 +------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/ddtrace/contrib/internal/crewai/patch.py b/ddtrace/contrib/internal/crewai/patch.py index dff3535ad57..8369c1148a5 100644 --- a/ddtrace/contrib/internal/crewai/patch.py +++ b/ddtrace/contrib/internal/crewai/patch.py @@ -158,10 +158,15 @@ async def traced_flow_method(crewai, pin, func, instance, args, kwargs): submit_to_llmobs=True, flow_instance=instance, ) as span: - initial_flow_state = getattr(instance, "state", {}) + flow_state = getattr(instance, "state", {}) + initial_flow_state = {} + if isinstance(flow_state, dict): + initial_flow_state = {**flow_state} + elif hasattr(flow_state, "model_dump"): + initial_flow_state = flow_state.model_dump() result = await func(*args, **kwargs) kwargs["_dd.instance"] = instance - kwargs["_dd.flow_state"] = initial_flow_state + kwargs["_dd.initial_flow_state"] = initial_flow_state integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=result, operation="flow_method") return result diff --git a/ddtrace/llmobs/_integrations/crewai.py b/ddtrace/llmobs/_integrations/crewai.py index c36683dd476..bbf39030109 100644 --- a/ddtrace/llmobs/_integrations/crewai.py +++ b/ddtrace/llmobs/_integrations/crewai.py @@ -280,13 +280,7 @@ def _llmobs_set_tags_flow(self, span, args, kwargs, response): def _llmobs_set_tags_flow_method(self, span, args, kwargs, response): flow_instance = kwargs.pop("_dd.instance", None) - state_value = kwargs.pop("_dd.flow_state", {}) - initial_flow_state = {} - if isinstance(state_value, dict): - initial_flow_state = state_value - elif hasattr(state_value, "model_dump"): - initial_flow_state = state_value.model_dump() - + initial_flow_state = kwargs.pop("_dd.initial_flow_state", {}) input_dict = { "args": [safe_json(arg) for arg in args[2:]], "kwargs": {k: safe_json(v) for k, v in kwargs.items()},