diff --git a/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/utils.py b/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/utils.py index 06181591d..fc38c80a5 100644 --- a/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/utils.py +++ b/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/utils.py @@ -1,6 +1,75 @@ +import dataclasses +import json import logging +import os import traceback + +from opentelemetry import context as context_api from opentelemetry.instrumentation.haystack.config import Config +from opentelemetry.semconv.ai import SpanAttributes + + +class EnhancedJSONEncoder(json.JSONEncoder): + def default(self, o): + if dataclasses.is_dataclass(o): + return dataclasses.asdict(o) + if hasattr(o, "to_json"): + return o.to_json() + return super().default(o) + + +def should_send_prompts(): + return ( + os.getenv("TRACELOOP_TRACE_CONTENT") or "true" + ).lower() == "true" or context_api.get_value("override_enable_content_tracing") + + +def dont_throw(func): + """ + A decorator that wraps the passed in function and logs exceptions instead of throwing them. + + @param func: The function to wrap + @return: The wrapper function + """ + # Obtain a logger specific to the function's module + logger = logging.getLogger(func.__module__) + + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + logger.debug( + "OpenLLMetry failed to trace in %s, error: %s", func.__name__, str(e) + ) + if Config.exception_logger: + Config.exception_logger(e) + + return wrapper + + +@dont_throw +def process_request(span, args, kwargs): + if should_send_prompts(): + kwargs_to_serialize = kwargs.copy() + for arg in args: + if arg and isinstance(arg, dict): + for key, value in arg.items(): + kwargs_to_serialize[key] = value + args_to_serialize = [arg for arg in args if not isinstance(arg, dict)] + input_entity = {"args": args_to_serialize, "kwargs": kwargs_to_serialize} + span.set_attribute( + SpanAttributes.TRACELOOP_ENTITY_INPUT, + json.dumps(input_entity, cls=EnhancedJSONEncoder), + ) + + +@dont_throw +def process_response(span, response): + if should_send_prompts(): + span.set_attribute( + SpanAttributes.TRACELOOP_ENTITY_OUTPUT, + json.dumps(response, cls=EnhancedJSONEncoder), + ) def set_span_attribute(span, name, value): diff --git a/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/wrap_pipeline.py b/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/wrap_pipeline.py index d4c3cba64..635783a79 100644 --- a/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/wrap_pipeline.py +++ b/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/wrap_pipeline.py @@ -4,7 +4,11 @@ from opentelemetry.instrumentation.utils import ( _SUPPRESS_INSTRUMENTATION_KEY, ) -from opentelemetry.instrumentation.haystack.utils import with_tracer_wrapper +from opentelemetry.instrumentation.haystack.utils import ( + with_tracer_wrapper, + process_request, + process_response, +) from opentelemetry.semconv.ai import SpanAttributes, TraceloopSpanKindValues logger = logging.getLogger(__name__) @@ -22,7 +26,8 @@ def wrap(tracer, to_wrap, wrapped, instance, args, kwargs): TraceloopSpanKindValues.WORKFLOW.value, ) span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_NAME, name) - + process_request(span, args, kwargs) response = wrapped(*args, **kwargs) + process_response(span, response) return response diff --git a/packages/opentelemetry-instrumentation-haystack/tests/conftest.py b/packages/opentelemetry-instrumentation-haystack/tests/conftest.py index 6e6a152bb..e550a8088 100644 --- a/packages/opentelemetry-instrumentation-haystack/tests/conftest.py +++ b/packages/opentelemetry-instrumentation-haystack/tests/conftest.py @@ -28,6 +28,7 @@ def exporter(): @pytest.fixture(autouse=True) def environment(): os.environ["OPENAI_API_KEY"] = "test_api_key" + os.environ["TRACELOOP_TRACE_CONTENT"] = "true" @pytest.fixture(autouse=True) diff --git a/packages/opentelemetry-instrumentation-haystack/tests/test_simple_pipeline.py b/packages/opentelemetry-instrumentation-haystack/tests/test_simple_pipeline.py index dca98699f..c1c546b2e 100644 --- a/packages/opentelemetry-instrumentation-haystack/tests/test_simple_pipeline.py +++ b/packages/opentelemetry-instrumentation-haystack/tests/test_simple_pipeline.py @@ -5,6 +5,7 @@ from haystack.components.builders import DynamicChatPromptBuilder from haystack.dataclasses import ChatMessage from haystack.utils import Secret +from opentelemetry.semconv.ai import SpanAttributes @pytest.mark.vcr @@ -33,4 +34,7 @@ def test_haystack(exporter): assert { "haystack.openai.chat", "haystack_pipeline.workflow", - }.issubset([span.name for span in spans]) + } == {span.name for span in spans} + span_workflow = next(span for span in spans if span.name == "haystack_pipeline.workflow") + assert SpanAttributes.TRACELOOP_ENTITY_INPUT in span_workflow.attributes + assert SpanAttributes.TRACELOOP_ENTITY_OUTPUT in span_workflow.attributes