Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(haystack): add input and output #1202

Merged
merged 5 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,75 @@
import dataclasses
import json
import logging
import os
import traceback

from opentelemetry import context as context_api
from opentelemetry.instrumentation.haystack.config import Config
from opentelemetry.semconv.ai import SpanAttributes


class EnhancedJSONEncoder(json.JSONEncoder):
def default(self, o):
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
if hasattr(o, "to_json"):
return o.to_json()
return super().default(o)


def should_send_prompts():
return (
os.getenv("TRACELOOP_TRACE_CONTENT") or "true"
).lower() == "true" or context_api.get_value("override_enable_content_tracing")


def dont_throw(func):
"""
A decorator that wraps the passed in function and logs exceptions instead of throwing them.

@param func: The function to wrap
@return: The wrapper function
"""
# Obtain a logger specific to the function's module
logger = logging.getLogger(func.__module__)

def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.debug(
"OpenLLMetry failed to trace in %s, error: %s", func.__name__, str(e)
)
if Config.exception_logger:
Config.exception_logger(e)

return wrapper


@dont_throw
def process_request(span, args, kwargs):
if should_send_prompts():
kwargs_to_serialize = kwargs.copy()
for arg in args:
if arg and isinstance(arg, dict):
for key, value in arg.items():
kwargs_to_serialize[key] = value
args_to_serialize = [arg for arg in args if not isinstance(arg, dict)]
input_entity = {"args": args_to_serialize, "kwargs": kwargs_to_serialize}
span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_INPUT,
json.dumps(input_entity, cls=EnhancedJSONEncoder),
)


@dont_throw
def process_response(span, response):
if should_send_prompts():
span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_OUTPUT,
json.dumps(response, cls=EnhancedJSONEncoder),
)


def set_span_attribute(span, name, value):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from opentelemetry.instrumentation.utils import (
_SUPPRESS_INSTRUMENTATION_KEY,
)
from opentelemetry.instrumentation.haystack.utils import with_tracer_wrapper
from opentelemetry.instrumentation.haystack.utils import (
with_tracer_wrapper,
process_request,
process_response,
)
from opentelemetry.semconv.ai import SpanAttributes, TraceloopSpanKindValues

logger = logging.getLogger(__name__)
Expand All @@ -22,7 +26,8 @@ def wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
TraceloopSpanKindValues.WORKFLOW.value,
)
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_NAME, name)

process_request(span, args, kwargs)
response = wrapped(*args, **kwargs)
process_response(span, response)

return response
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def exporter():
@pytest.fixture(autouse=True)
def environment():
os.environ["OPENAI_API_KEY"] = "test_api_key"
os.environ["TRACELOOP_TRACE_CONTENT"] = "true"


@pytest.fixture(autouse=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from haystack.components.builders import DynamicChatPromptBuilder
from haystack.dataclasses import ChatMessage
from haystack.utils import Secret
from opentelemetry.semconv.ai import SpanAttributes


@pytest.mark.vcr
Expand Down Expand Up @@ -33,4 +34,7 @@ def test_haystack(exporter):
assert {
"haystack.openai.chat",
"haystack_pipeline.workflow",
}.issubset([span.name for span in spans])
} == {span.name for span in spans}
span_workflow = next(span for span in spans if span.name == "haystack_pipeline.workflow")
assert SpanAttributes.TRACELOOP_ENTITY_INPUT in span_workflow.attributes
assert SpanAttributes.TRACELOOP_ENTITY_OUTPUT in span_workflow.attributes