From 5b778052bcbc3cd8a6f8e4cb4e7b24818c4bde3b Mon Sep 17 00:00:00 2001 From: Tibor Reiss Date: Sun, 2 Jun 2024 17:02:22 +0200 Subject: [PATCH 1/5] Add input and output --- .../instrumentation/haystack/utils.py | 61 +++++++++++++++++++ .../instrumentation/haystack/wrap_pipeline.py | 16 ++++- .../tests/test_simple_pipeline.py | 2 +- 3 files changed, 77 insertions(+), 2 deletions(-) diff --git a/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/utils.py b/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/utils.py index 06181591d..111a22618 100644 --- a/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/utils.py +++ b/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/utils.py @@ -1,8 +1,69 @@ +import dataclasses +import json import logging +import os import traceback + +from opentelemetry import context as context_api from opentelemetry.instrumentation.haystack.config import Config +class EnhancedJSONEncoder(json.JSONEncoder): + def default(self, o): + if dataclasses.is_dataclass(o): + return dataclasses.asdict(o) + if hasattr(o, "to_json"): + return o.to_json() + return super().default(o) + + +def should_send_prompts(): + return ( + os.getenv("TRACELOOP_TRACE_CONTENT") or "true" + ).lower() == "true" or context_api.get_value("override_enable_content_tracing") + + +def dont_throw(func): + """ + A decorator that wraps the passed in function and logs exceptions instead of throwing them. + + @param func: The function to wrap + @return: The wrapper function + """ + # Obtain a logger specific to the function's module + logger = logging.getLogger(func.__module__) + + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + logger.debug( + "OpenLLMetry failed to trace in %s, error: %s", func.__name__, str(e) + ) + if Config.exception_logger: + Config.exception_logger(e) + + return wrapper + + +@dont_throw +def process_request(args, kwargs): + kwargs_to_serialize = kwargs.copy() + for arg in args: + if arg and isinstance(arg, dict): + for key, value in arg.items(): + kwargs_to_serialize[key] = value + args_to_serialize = [arg for arg in args if not isinstance(arg, dict)] + input_entity = {"args": args_to_serialize, "kwargs": kwargs_to_serialize} + return json.dumps(input_entity, cls=EnhancedJSONEncoder) + + +@dont_throw +def process_response(response): + if should_send_prompts(): + return json.dumps(response, cls=EnhancedJSONEncoder) + + def set_span_attribute(span, name, value): if value is not None: if value != "": diff --git a/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/wrap_pipeline.py b/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/wrap_pipeline.py index d4c3cba64..fe5d035fe 100644 --- a/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/wrap_pipeline.py +++ b/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/wrap_pipeline.py @@ -1,10 +1,15 @@ +import json import logging from opentelemetry import context as context_api from opentelemetry.context import attach, set_value from opentelemetry.instrumentation.utils import ( _SUPPRESS_INSTRUMENTATION_KEY, ) -from opentelemetry.instrumentation.haystack.utils import with_tracer_wrapper +from opentelemetry.instrumentation.haystack.utils import ( + with_tracer_wrapper, + process_request, + process_response, +) from opentelemetry.semconv.ai import SpanAttributes, TraceloopSpanKindValues logger = logging.getLogger(__name__) @@ -22,7 +27,16 @@ def wrap(tracer, to_wrap, wrapped, instance, args, kwargs): TraceloopSpanKindValues.WORKFLOW.value, ) span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_NAME, name) + span.set_attribute( + SpanAttributes.TRACELOOP_ENTITY_INPUT, + process_request(args, kwargs), + ) response = wrapped(*args, **kwargs) + span.set_attribute( + SpanAttributes.TRACELOOP_ENTITY_OUTPUT, + process_response(response), + ) + return response diff --git a/packages/opentelemetry-instrumentation-haystack/tests/test_simple_pipeline.py b/packages/opentelemetry-instrumentation-haystack/tests/test_simple_pipeline.py index dca98699f..5971db015 100644 --- a/packages/opentelemetry-instrumentation-haystack/tests/test_simple_pipeline.py +++ b/packages/opentelemetry-instrumentation-haystack/tests/test_simple_pipeline.py @@ -33,4 +33,4 @@ def test_haystack(exporter): assert { "haystack.openai.chat", "haystack_pipeline.workflow", - }.issubset([span.name for span in spans]) + } == {span.name for span in spans} From 443741284f6409488627f20843432fe4c2568fdf Mon Sep 17 00:00:00 2001 From: Tibor Reiss Date: Thu, 6 Jun 2024 20:34:02 +0200 Subject: [PATCH 2/5] Fix linting --- .../opentelemetry/instrumentation/haystack/wrap_pipeline.py | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/wrap_pipeline.py b/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/wrap_pipeline.py index fe5d035fe..b3ea77a16 100644 --- a/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/wrap_pipeline.py +++ b/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/wrap_pipeline.py @@ -1,4 +1,3 @@ -import json import logging from opentelemetry import context as context_api from opentelemetry.context import attach, set_value From 759768ed33398d5cb5af9862b0d50d50d26c7469 Mon Sep 17 00:00:00 2001 From: Tibor Reiss Date: Thu, 6 Jun 2024 20:51:05 +0200 Subject: [PATCH 3/5] Code review --- .../instrumentation/haystack/utils.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/utils.py b/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/utils.py index 111a22618..7a722048e 100644 --- a/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/utils.py +++ b/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/utils.py @@ -48,14 +48,15 @@ def wrapper(*args, **kwargs): @dont_throw def process_request(args, kwargs): - kwargs_to_serialize = kwargs.copy() - for arg in args: - if arg and isinstance(arg, dict): - for key, value in arg.items(): - kwargs_to_serialize[key] = value - args_to_serialize = [arg for arg in args if not isinstance(arg, dict)] - input_entity = {"args": args_to_serialize, "kwargs": kwargs_to_serialize} - return json.dumps(input_entity, cls=EnhancedJSONEncoder) + if should_send_prompts(): + kwargs_to_serialize = kwargs.copy() + for arg in args: + if arg and isinstance(arg, dict): + for key, value in arg.items(): + kwargs_to_serialize[key] = value + args_to_serialize = [arg for arg in args if not isinstance(arg, dict)] + input_entity = {"args": args_to_serialize, "kwargs": kwargs_to_serialize} + return json.dumps(input_entity, cls=EnhancedJSONEncoder) @dont_throw From d01b9b5e085a42b9b3f93a6b4672130c1edf18ac Mon Sep 17 00:00:00 2001 From: Tibor Reiss Date: Fri, 7 Jun 2024 18:59:42 +0200 Subject: [PATCH 4/5] Move logic into util functions --- .../instrumentation/haystack/utils.py | 15 +++++++++++---- .../instrumentation/haystack/wrap_pipeline.py | 12 ++---------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/utils.py b/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/utils.py index 7a722048e..fc38c80a5 100644 --- a/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/utils.py +++ b/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/utils.py @@ -6,6 +6,7 @@ from opentelemetry import context as context_api from opentelemetry.instrumentation.haystack.config import Config +from opentelemetry.semconv.ai import SpanAttributes class EnhancedJSONEncoder(json.JSONEncoder): @@ -47,7 +48,7 @@ def wrapper(*args, **kwargs): @dont_throw -def process_request(args, kwargs): +def process_request(span, args, kwargs): if should_send_prompts(): kwargs_to_serialize = kwargs.copy() for arg in args: @@ -56,13 +57,19 @@ def process_request(args, kwargs): kwargs_to_serialize[key] = value args_to_serialize = [arg for arg in args if not isinstance(arg, dict)] input_entity = {"args": args_to_serialize, "kwargs": kwargs_to_serialize} - return json.dumps(input_entity, cls=EnhancedJSONEncoder) + span.set_attribute( + SpanAttributes.TRACELOOP_ENTITY_INPUT, + json.dumps(input_entity, cls=EnhancedJSONEncoder), + ) @dont_throw -def process_response(response): +def process_response(span, response): if should_send_prompts(): - return json.dumps(response, cls=EnhancedJSONEncoder) + span.set_attribute( + SpanAttributes.TRACELOOP_ENTITY_OUTPUT, + json.dumps(response, cls=EnhancedJSONEncoder), + ) def set_span_attribute(span, name, value): diff --git a/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/wrap_pipeline.py b/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/wrap_pipeline.py index b3ea77a16..635783a79 100644 --- a/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/wrap_pipeline.py +++ b/packages/opentelemetry-instrumentation-haystack/opentelemetry/instrumentation/haystack/wrap_pipeline.py @@ -26,16 +26,8 @@ def wrap(tracer, to_wrap, wrapped, instance, args, kwargs): TraceloopSpanKindValues.WORKFLOW.value, ) span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_NAME, name) - span.set_attribute( - SpanAttributes.TRACELOOP_ENTITY_INPUT, - process_request(args, kwargs), - ) - + process_request(span, args, kwargs) response = wrapped(*args, **kwargs) - - span.set_attribute( - SpanAttributes.TRACELOOP_ENTITY_OUTPUT, - process_response(response), - ) + process_response(span, response) return response From b5f78255fee4bca5c5aa3b339c9181bc6603492d Mon Sep 17 00:00:00 2001 From: Tibor Reiss Date: Fri, 7 Jun 2024 19:43:24 +0200 Subject: [PATCH 5/5] Update tests --- .../opentelemetry-instrumentation-haystack/tests/conftest.py | 1 + .../tests/test_simple_pipeline.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/packages/opentelemetry-instrumentation-haystack/tests/conftest.py b/packages/opentelemetry-instrumentation-haystack/tests/conftest.py index 6e6a152bb..e550a8088 100644 --- a/packages/opentelemetry-instrumentation-haystack/tests/conftest.py +++ b/packages/opentelemetry-instrumentation-haystack/tests/conftest.py @@ -28,6 +28,7 @@ def exporter(): @pytest.fixture(autouse=True) def environment(): os.environ["OPENAI_API_KEY"] = "test_api_key" + os.environ["TRACELOOP_TRACE_CONTENT"] = "true" @pytest.fixture(autouse=True) diff --git a/packages/opentelemetry-instrumentation-haystack/tests/test_simple_pipeline.py b/packages/opentelemetry-instrumentation-haystack/tests/test_simple_pipeline.py index 5971db015..c1c546b2e 100644 --- a/packages/opentelemetry-instrumentation-haystack/tests/test_simple_pipeline.py +++ b/packages/opentelemetry-instrumentation-haystack/tests/test_simple_pipeline.py @@ -5,6 +5,7 @@ from haystack.components.builders import DynamicChatPromptBuilder from haystack.dataclasses import ChatMessage from haystack.utils import Secret +from opentelemetry.semconv.ai import SpanAttributes @pytest.mark.vcr @@ -34,3 +35,6 @@ def test_haystack(exporter): "haystack.openai.chat", "haystack_pipeline.workflow", } == {span.name for span in spans} + span_workflow = next(span for span in spans if span.name == "haystack_pipeline.workflow") + assert SpanAttributes.TRACELOOP_ENTITY_INPUT in span_workflow.attributes + assert SpanAttributes.TRACELOOP_ENTITY_OUTPUT in span_workflow.attributes