Skip to content

Commit

Permalink
fix: Overriding the current TracerProvider when enabling tracing
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 654822106
  • Loading branch information
vertex-sdk-bot authored and copybara-github committed Jul 22, 2024
1 parent efb8413 commit 1476c10
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 8 deletions.
91 changes: 84 additions & 7 deletions vertexai/preview/reasoning_engines/templates/langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@
except ImportError:
_ToolLike = Any

try:
from opentelemetry.sdk import trace

TracerProvider = trace.TracerProvider
SpanProcessor = trace.SpanProcessor
SynchronousMultiSpanProcessor = trace.SynchronousMultiSpanProcessor
except ImportError:
TracerProvider = Any
SpanProcessor = Any
SynchronousMultiSpanProcessor = Any


def _default_runnable_kwargs(has_history: bool) -> Mapping[str, Any]:
# https://github.com/langchain-ai/langchain/blob/5784dfed001730530637793bea1795d9d5a7c244/libs/core/langchain_core/runnables/history.py#L237-L241
Expand Down Expand Up @@ -214,6 +225,34 @@ def _validate_tools(tools: Sequence["_ToolLike"]):
_validate_callable_parameters_are_annotated(tool)


def _override_active_span_processor(
tracer_provider: "TracerProvider",
active_span_processor: "SynchronousMultiSpanProcessor",
):
"""Overrides the active span processor.
When working with multiple LangchainAgents in the same environment,
it's crucial to manage trace exports carefully.
Each agent needs its own span processor tied to a unique project ID.
While we add a new span processor for each agent, this can lead to
unexpected behavior.
For instance, with two agents linked to different projects, traces from the
second agent might be sent to both projects.
To prevent this and guarantee traces go to the correct project, we overwrite
the active span processor whenever a new LangchainAgent is created.
Args:
tracer_provider (TracerProvider):
The tracer provider to use for the project.
active_span_processor (SynchronousMultiSpanProcessor):
The active span processor overrides the tracer provider's
active span processor.
"""
if tracer_provider._active_span_processor:
tracer_provider._active_span_processor.shutdown()
tracer_provider._active_span_processor = active_span_processor


class LangchainAgent:
"""A Langchain Agent.
Expand Down Expand Up @@ -419,17 +458,55 @@ def set_up(self):
credentials=credentials.with_quota_project(self._project),
),
)
span_processor = opentelemetry_sdk_trace.export.SimpleSpanProcessor(
span_exporter=span_exporter,
span_processor: SpanProcessor = (
opentelemetry_sdk_trace.export.SimpleSpanProcessor(
span_exporter=span_exporter,
)
)
tracer_provider: TracerProvider = (
opentelemetry.trace.get_tracer_provider()
)
tracer_provider = opentelemetry.trace.get_tracer_provider()
if tracer_provider and _utils._is_noop_tracer_provider(tracer_provider):
# Avoids AttributeError: 'ProxyTracerProvider' object has no
# attribute 'add_span_processor'
# Get the appropriate tracer provider:
# 1. If _TRACER_PROVIDER is already set, use that.
# 2. Otherwise, if the OTEL_PYTHON_TRACER_PROVIDER environment
# variable is set, use that.
# 3. As a final fallback, use _PROXY_TRACER_PROVIDER.
# If none of the above is set, we log a warning, and
# create a tracer provider.
if not tracer_provider:
from google.cloud.aiplatform import base

_LOGGER = base.Logger(__name__)
_LOGGER.warning(
"No tracer provider. By default, "
"we should get one of the following providers: "
"OTEL_PYTHON_TRACER_PROVIDER, _TRACER_PROVIDER, "
"or _PROXY_TRACER_PROVIDER."
)
tracer_provider = opentelemetry_sdk_trace.TracerProvider()
opentelemetry.trace.set_tracer_provider(tracer_provider)
opentelemetry.trace.set_tracer_provider(tracer_provider)
# Avoids AttributeError:
# 'ProxyTracerProvider' and 'NoOpTracerProvider' objects has no
# attribute 'add_span_processor'.
if _utils.is_noop_or_proxy_tracer_provider(tracer_provider):
tracer_provider = opentelemetry_sdk_trace.TracerProvider()
opentelemetry.trace.set_tracer_provider(tracer_provider)
# Avoids OpenTelemetry client already exists error.
_override_active_span_processor(
tracer_provider,
opentelemetry_sdk_trace.SynchronousMultiSpanProcessor(),
)
tracer_provider.add_span_processor(span_processor)
# Keep the instrumentation up-to-date.
# When creating multiple LangchainAgents,
# we need to keep the instrumentation up-to-date.
# We deliberately override the instrument each time,
# so that if different agents end up using different
# instrumentations, we guarantee that the user is always
# working with the most recent agent's instrumentation.
self._instrumentor = openinference_langchain.LangChainInstrumentor()
if self._instrumentor.is_instrumented_by_opentelemetry:
self._instrumentor.uninstrument()
self._instrumentor.instrument()
else:
from google.cloud.aiplatform import base
Expand Down
2 changes: 1 addition & 1 deletion vertexai/reasoning_engines/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def generate_schema(
return schema


def _is_noop_tracer_provider(tracer_provider) -> bool:
def is_noop_or_proxy_tracer_provider(tracer_provider) -> bool:
"""Returns True if the tracer_provider is Proxy or NoOp."""
opentelemetry = _import_opentelemetry_or_warn()
ProxyTracerProvider = opentelemetry.trace.ProxyTracerProvider
Expand Down

0 comments on commit 1476c10

Please sign in to comment.