From 9f1792f98d86a13e41b83f2f80a2b591e3f2cf3c Mon Sep 17 00:00:00 2001 From: Corvin Lasogga Date: Sun, 14 Aug 2022 12:55:01 +0200 Subject: [PATCH] Complete redesign of OpenTelemetryClientInterceptor including support of metrics, tracing of messages and changed API to officical grpc.ClientInterceptor API --- .../instrumentation/grpc/__init__.py | 46 ++- .../instrumentation/grpc/_client.py | 368 +++++++++++++----- .../instrumentation/grpc/_utilities.py | 146 ++++++- .../instrumentation/grpc/grpcext/__init__.py | 125 ------ .../grpc/grpcext/_interceptor.py | 350 ----------------- 5 files changed, 445 insertions(+), 590 deletions(-) delete mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py delete mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index f67f938d53..41d97fb461 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -27,6 +27,11 @@ from opentelemetry import trace from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import ( + ConsoleMetricExporter, + PeriodicExportingMetricReader, + ) from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( ConsoleSpanExporter, @@ -38,6 +43,12 @@ except ImportError: from gen import helloworld_pb2, helloworld_pb2_grpc + exporter = ConsoleMetricExporter() + reader = PeriodicExportingMetricReader(exporter) + metrics.set_meter_provider( + MeterProvider(metric_readers=[reader]) + ) + trace.set_tracer_provider(TracerProvider()) trace.get_tracer_provider().add_span_processor( SimpleSpanProcessor(ConsoleSpanExporter()) @@ -58,6 +69,18 @@ def run(): logging.basicConfig() run() +You can also add the instrumentor manually, rather than using +:py:class:`~opentelemetry.instrumentation.grpc.GrpcInstrumentorClient`: + +.. code-block:: python + + from opentelemetry.instrumentation.grpc import client_interceptor + + channel = grpc.intercept_channel( + grpc.insecure_channel(...) / grpc.secure_channel(...), + client_interceptor() + ) + Usage Server ------------ .. code-block:: python @@ -135,7 +158,6 @@ def serve(): from wrapt import wrap_function_wrapper as _wrap from opentelemetry import metrics, trace -from opentelemetry.instrumentation.grpc.grpcext import intercept_channel from opentelemetry.instrumentation.grpc.package import _instruments from opentelemetry.instrumentation.grpc.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor @@ -235,39 +257,47 @@ def _uninstrument(self, **kwargs): def wrapper_fn(self, original_func, instance, args, kwargs): channel = original_func(*args, **kwargs) + meter_provider = kwargs.get("meter_provider") tracer_provider = kwargs.get("tracer_provider") - return intercept_channel( + return grpc.intercept_channel( channel, - client_interceptor(tracer_provider=tracer_provider), + client_interceptor( + meter_provider=meter_provider, + tracer_provider=tracer_provider + ) ) -def client_interceptor(tracer_provider=None): +def client_interceptor(meter_provider=None, tracer_provider=None): """Create a gRPC client channel interceptor. Args: - tracer: The tracer to use to create client-side spans. + meter_provider: The meter provider which allows access to the meter. + tracer_provider: The tracer provider which allows access to the tracer. Returns: An invocation-side interceptor object. """ + from . import _client + meter = metrics.get_meter(__name__, __version__, meter_provider) tracer = trace.get_tracer(__name__, __version__, tracer_provider) - return _client.OpenTelemetryClientInterceptor(tracer) + return _client.OpenTelemetryClientInterceptor(meter, tracer) def server_interceptor(meter_provider=None, tracer_provider=None): """Create a gRPC server interceptor. Args: - meter_provider: The meter provider which allows acess to the meter. - tracer_provider: The tracer provider which allows acess to the tracer. + meter_provider: The meter provider which allows access to the meter. + tracer_provider: The tracer provider which allows access to the tracer. Returns: A service-side interceptor object. """ + from . import _server meter = metrics.get_meter(__name__, __version__, meter_provider) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index b73b822b14..e36441bdb2 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -20,18 +20,20 @@ """Implementation of the invocation-side open-telemetry interceptor.""" from collections import OrderedDict -from typing import MutableMapping +from typing import Callable, Iterator, MutableMapping import grpc from opentelemetry import context, trace -from opentelemetry.instrumentation.grpc import grpcext -from opentelemetry.instrumentation.grpc._utilities import RpcInfo +from opentelemetry.instrumentation.grpc._types import ProtoMessage +from opentelemetry.instrumentation.grpc._utilities import _ClientCallDetails, _EventMetricRecorder, _MetricKind, _OpentelemetryResponseIterator from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY from opentelemetry.propagate import inject from opentelemetry.propagators.textmap import Setter -from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.semconv.trace import MessageTypeValues, RpcSystemValues, SpanAttributes +from opentelemetry.trace import Span from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.util.types import Attributes class _CarrierSetter(Setter): @@ -39,160 +41,316 @@ class _CarrierSetter(Setter): keys as is required by grpc. """ - def set(self, carrier: MutableMapping[str, str], key: str, value: str): + def set( + self, + carrier: MutableMapping[str, str], + key: str, + value: str + ) -> None: carrier[key.lower()] = value _carrier_setter = _CarrierSetter() -def _make_future_done_callback(span, rpc_info): - def callback(response_future): - with trace.use_span(span, end_on_exit=True): +def _make_future_done_callback( + span: Span, + metric_recorder: _EventMetricRecorder, + attributes: Attributes, + start_time: float, +) -> Callable[[grpc.Future], None]: + + def callback(response_future: grpc.Future) -> None: + with trace.use_span( + span, + record_exception=False, + set_status_on_exception=False, + end_on_exit=True, + ): code = response_future.code() if code != grpc.StatusCode.OK: - rpc_info.error = code - return - response = response_future.result() - rpc_info.response = response + details = response_future.details() + span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0] + ) + span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{code}: {details}", + ) + ) + + try: + span.record_exception(response_future.exception()) + except grpc.FutureCancelledError: + pass + + else: + response = response_future.result() + if response is not None: + metric_recorder._record_unary_response( + span, response, MessageTypeValues.RECEIVED, attributes + ) + + metric_recorder._record_duration( + start_time, + attributes, + response_future + ) return callback class OpenTelemetryClientInterceptor( - grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor + _EventMetricRecorder, + grpc.UnaryUnaryClientInterceptor, + grpc.UnaryStreamClientInterceptor, + grpc.StreamUnaryClientInterceptor, + grpc.StreamStreamClientInterceptor ): - def __init__(self, tracer): + + def __init__(self, meter, tracer): + super().__init__(meter, _MetricKind.CLIENT) self._tracer = tracer - def _start_span(self, method, **kwargs): - service, meth = method.lstrip("/").split("/", 1) - attributes = { - SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], - SpanAttributes.RPC_METHOD: meth, + def _create_attributes(self, full_method: str) -> Attributes: + service, method = full_method.lstrip("/").split("/", 1) + return { + SpanAttributes.RPC_SYSTEM: RpcSystemValues.GRPC.value, SpanAttributes.RPC_SERVICE: service, + SpanAttributes.RPC_METHOD: method, + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], } - return self._tracer.start_as_current_span( - name=method, - kind=trace.SpanKind.CLIENT, - attributes=attributes, - **kwargs, - ) - - # pylint:disable=no-self-use - def _trace_result(self, span, rpc_info, result): - # If the RPC is called asynchronously, add a callback to end the span - # when the future is done, else end the span immediately - if isinstance(result, grpc.Future): - result.add_done_callback( - _make_future_done_callback(span, rpc_info) - ) - return result - response = result - # Handle the case when the RPC is initiated via the with_call - # method and the result is a tuple with the first element as the - # response. - # http://www.grpc.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.with_call - if isinstance(result, tuple): - response = result[0] - rpc_info.response = response - span.end() - return result - - def _intercept(self, request, metadata, client_info, invoker): + def intercept_unary_unary( + self, + continuation, + client_call_details: grpc.ClientCallDetails, + request: ProtoMessage + ): if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): - return invoker(request, metadata) + return continuation(client_call_details, request) + + attributes = self._create_attributes(client_call_details.method) - if not metadata: + if not client_call_details.metadata: mutable_metadata = OrderedDict() else: - mutable_metadata = OrderedDict(metadata) - with self._start_span( - client_info.full_method, + mutable_metadata = OrderedDict(client_call_details.metadata) + + with self._tracer.start_as_current_span( + name=client_call_details.method, + kind=trace.SpanKind.CLIENT, + attributes=attributes, end_on_exit=False, record_exception=False, set_status_on_exception=False, ) as span: - result = None + response_future = None + start_time = 0.0 + try: inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) - - rpc_info = RpcInfo( - full_method=client_info.full_method, + new_client_call_details = _ClientCallDetails( + method=client_call_details.method, + timeout=client_call_details.timeout, metadata=metadata, - timeout=client_info.timeout, - request=request, + credentials=client_call_details.credentials, + wait_for_ready=client_call_details.wait_for_ready, + compression=client_call_details.compression ) - result = invoker(request, metadata) - except Exception as exc: - if isinstance(exc, grpc.RpcError): - span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, - exc.code().value[0], - ) - span.set_status( - Status( - status_code=StatusCode.ERROR, - description=f"{type(exc).__name__}: {exc}", - ) + start_time = self._start_duration_measurement() + self._record_unary_request( + span, + request, + MessageTypeValues.SENT, + attributes ) - span.record_exception(exc) - raise exc + + response_future = continuation( + new_client_call_details, + request + ) + finally: - if not result: + if not response_future: span.end() - return self._trace_result(span, rpc_info, result) + else: + response_future.add_done_callback( + _make_future_done_callback( + span, + self, + attributes, + start_time + ) + ) - def intercept_unary(self, request, metadata, client_info, invoker): - return self._intercept(request, metadata, client_info, invoker) + return response_future - # For RPCs that stream responses, the result can be a generator. To record - # the span across the generated responses and detect any errors, we wrap - # the result in a new generator that yields the response values. - def _intercept_server_stream( - self, request_or_iterator, metadata, client_info, invoker + def intercept_unary_stream( + self, + continuation, + client_call_details: grpc.ClientCallDetails, + request: ProtoMessage ): - if not metadata: + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return continuation(client_call_details, request) + + attributes = self._create_attributes(client_call_details.method) + + if not client_call_details.metadata: mutable_metadata = OrderedDict() else: - mutable_metadata = OrderedDict(metadata) + mutable_metadata = OrderedDict(client_call_details.metadata) - with self._start_span(client_info.full_method) as span: + with self._tracer.start_as_current_span( + name=client_call_details.method, + kind=trace.SpanKind.CLIENT, + attributes=attributes, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) - rpc_info = RpcInfo( - full_method=client_info.full_method, + new_client_call_details = _ClientCallDetails( + method=client_call_details.method, + timeout=client_call_details.timeout, metadata=metadata, - timeout=client_info.timeout, + credentials=client_call_details.credentials, + wait_for_ready=client_call_details.wait_for_ready, + compression=client_call_details.compression ) - if client_info.is_client_stream: - rpc_info.request = request_or_iterator + start_time = self._start_duration_measurement() + self._record_unary_request( + span, request, MessageTypeValues.SENT, attributes + ) + + response_iterator = continuation( + new_client_call_details, request + ) + + return _OpentelemetryResponseIterator( + response_iterator, + self, span, + attributes, + start_time + ) + + def intercept_stream_unary( + self, + continuation, + client_call_details: grpc.ClientCallDetails, + request_iterator: Iterator[ProtoMessage] + ): + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return continuation(client_call_details, request_iterator) + + attributes = self._create_attributes(client_call_details.method) + + if not client_call_details.metadata: + mutable_metadata = OrderedDict() + else: + mutable_metadata = OrderedDict(client_call_details.metadata) + + with self._tracer.start_as_current_span( + name=client_call_details.method, + kind=trace.SpanKind.CLIENT, + attributes=attributes, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + response_future = None + start_time = 0 try: - yield from invoker(request_or_iterator, metadata) - except grpc.RpcError as err: - span.set_status(Status(StatusCode.ERROR)) - span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0] + inject(mutable_metadata, setter=_carrier_setter) + metadata = tuple(mutable_metadata.items()) + new_client_call_details = _ClientCallDetails( + method=client_call_details.method, + timeout=client_call_details.timeout, + metadata=metadata, + credentials=client_call_details.credentials, + wait_for_ready=client_call_details.wait_for_ready, + compression=client_call_details.compression + ) + + start_time = self._start_duration_measurement() + request_iterator = self._record_streaming_request( + span, request_iterator, MessageTypeValues.SENT, attributes + ) + + response_future = continuation( + new_client_call_details, + request_iterator ) - raise err - def intercept_stream( - self, request_or_iterator, metadata, client_info, invoker + finally: + if not response_future: + span.end() + else: + response_future.add_done_callback( + _make_future_done_callback( + span, + self, + attributes, + start_time + ) + ) + + return response_future + + def intercept_stream_stream( + self, + continuation, + client_call_details: grpc.ClientCallDetails, + request_iterator: Iterator[ProtoMessage] ): if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): - return invoker(request_or_iterator, metadata) + return continuation(client_call_details, request_iterator) + + attributes = self._create_attributes(client_call_details.method) + + if not client_call_details.metadata: + mutable_metadata = OrderedDict() + else: + mutable_metadata = OrderedDict(client_call_details.metadata) + + with self._tracer.start_as_current_span( + name=client_call_details.method, + kind=trace.SpanKind.CLIENT, + attributes=attributes, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + inject(mutable_metadata, setter=_carrier_setter) + metadata = tuple(mutable_metadata.items()) + new_client_call_details = _ClientCallDetails( + method=client_call_details.method, + timeout=client_call_details.timeout, + metadata=metadata, + credentials=client_call_details.credentials, + wait_for_ready=client_call_details.wait_for_ready, + compression=client_call_details.compression + ) - if client_info.is_server_stream: - return self._intercept_server_stream( - request_or_iterator, metadata, client_info, invoker + start_time = self._start_duration_measurement() + request_iterator = self._record_streaming_request( + span, request_iterator, MessageTypeValues.SENT, attributes ) - return self._intercept( - request_or_iterator, metadata, client_info, invoker - ) + response_iterator = continuation( + new_client_call_details, request_iterator + ) + + return _OpentelemetryResponseIterator( + response_iterator, + self, span, + attributes, + start_time + ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py index 3c63f3f2b8..977e99f038 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py @@ -18,14 +18,15 @@ from contextlib import contextmanager from enum import Enum from timeit import default_timer +from types import TracebackType from typing import Callable, Dict, Generator, Iterable, Iterator, NoReturn, Optional import grpc from opentelemetry import metrics, trace -from opentelemetry.instrumentation.grpc._types import ProtoMessage +from opentelemetry.instrumentation.grpc._types import Metadata, ProtoMessage from opentelemetry.semconv.trace import MessageTypeValues, SpanAttributes from opentelemetry.trace.status import Status, StatusCode -from opentelemetry.util.types import Attributes, Metadata +from opentelemetry.util.types import Attributes _MESSAGE: str = "message" @@ -408,6 +409,147 @@ def _record_duration_manager( self._duration_histogram.record(duration, metric_attributes) +class _OpentelemetryResponseIterator( + grpc.Call, grpc.Future, grpc.RpcError, Iterator +): + + def __init__( + self, + response_iterator, + metric_recorder: _EventMetricRecorder, + span: trace.Span, + attributes: Attributes, + start_time: float + ) -> None: + self._iterator = response_iterator + self._metric_recorder = metric_recorder + self._span = span + self._attributes = attributes + self._start_time = start_time + self._response_id = 0 + + def __repr__(self): + return self._iterator.__repr__() + + def __str__(self): + return self._iterator.__str__() + + # Interface of grpc.RpcContext + + def add_callback(self, callback: Callable[[], None]) -> None: + return self._iterator.add_callback(callback) + + def cancel(self) -> bool: + return self._iterator.cancel() + + def is_active(self) -> bool: + return self._iterator.is_active() + + def time_remaining(self) -> Optional[float]: + return self._iterator.time_remaining() + + # Interface of grpc.Call + + def code(self) -> grpc.StatusCode: + return self._iterator.code() + + def details(self) -> Optional[str]: + return self._iterator.details() + + def initial_metadata(self) -> Metadata: + return self._iterator.initial_metadata() + + def trailing_metadata(self) -> Metadata: + return self._iterator.trailing_metadata() + + # Interface of grpc.Future + + # pylint: disable=invalid-name + def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: + return self._iterator.add_done_callback(fn) + + def cancelled(self) -> bool: + return self._iterator.cancelled() + + def done(self) -> bool: + return self._iterator.done() + + def exception(self, timeout: Optional[float] = None) -> Exception: + return self._iterator.exception(timeout) + + def result(self, timeout: Optional[float] = None) -> ProtoMessage: + return self._iterator.result(timeout) + + def running(self) -> bool: + return self._iterator.running() + + def traceback(self, timeout: Optional[float] = None) -> TracebackType: + return self._iterator.traceback(timeout) + + # Iterator inteface + + def __iter__(self): + return self + + def next(self): + return self._next() + + def __next__(self): + return self._next() + + def _next(self) -> ProtoMessage: + with trace.use_span( + self._span, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False + ): + try: + response = self._iterator._next() + self._response_id += 1 + self._metric_recorder._record_unary_or_streaming_response( + self._span, + response, + MessageTypeValues.RECEIVED, + self._attributes, + response_id=self._response_id + ) + return response + except grpc.RpcError as exc: + code = exc.code() + details = exc.details() + self._span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0] + ) + self._span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{code}: {details}", + ) + ) + self._span.record_exception(exc) + + self._metric_recorder._record_num_of_responses_per_rpc( + self._response_id, self._attributes + ) + self._metric_recorder._record_duration( + self._start_time, self._attributes, self._iterator + ) + self._span.end() + raise + + except StopIteration: + self._metric_recorder._record_num_of_responses_per_rpc( + self._response_id, self._attributes + ) + + self._metric_recorder._record_duration( + self._start_time, self._attributes, self._iterator + ) + self._span.end() + raise + + # pylint:disable=abstract-method class _OpenTelemetryServicerContext(grpc.ServicerContext): diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py deleted file mode 100644 index d5e2549bab..0000000000 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py +++ /dev/null @@ -1,125 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# pylint:disable=import-outside-toplevel -# pylint:disable=import-self -# pylint:disable=no-name-in-module - -import abc - - -class UnaryClientInfo(abc.ABC): - """Consists of various information about a unary RPC on the - invocation-side. - - Attributes: - full_method: A string of the full RPC method, i.e., - /package.service/method. - timeout: The length of time in seconds to wait for the computation to - terminate or be cancelled, or None if this method should block until - the computation is terminated or is cancelled no matter how long that - takes. - """ - - -class StreamClientInfo(abc.ABC): - """Consists of various information about a stream RPC on the - invocation-side. - - Attributes: - full_method: A string of the full RPC method, i.e., - /package.service/method. - is_client_stream: Indicates whether the RPC is client-streaming. - is_server_stream: Indicates whether the RPC is server-streaming. - timeout: The length of time in seconds to wait for the computation to - terminate or be cancelled, or None if this method should block until - the computation is terminated or is cancelled no matter how long that - takes. - """ - - -class UnaryClientInterceptor(abc.ABC): - """Affords intercepting unary-unary RPCs on the invocation-side.""" - - @abc.abstractmethod - def intercept_unary(self, request, metadata, client_info, invoker): - """Intercepts unary-unary RPCs on the invocation-side. - - Args: - request: The request value for the RPC. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - client_info: A UnaryClientInfo containing various information about - the RPC. - invoker: The handler to complete the RPC on the client. It is the - interceptor's responsibility to call it. - - Returns: - The result from calling invoker(request, metadata). - """ - raise NotImplementedError() - - -class StreamClientInterceptor(abc.ABC): - """Affords intercepting stream RPCs on the invocation-side.""" - - @abc.abstractmethod - def intercept_stream( - self, request_or_iterator, metadata, client_info, invoker - ): - """Intercepts stream RPCs on the invocation-side. - - Args: - request_or_iterator: The request value for the RPC if - `client_info.is_client_stream` is `false`; otherwise, an iterator of - request values. - metadata: Optional :term:`metadata` to be transmitted to the service-side - of the RPC. - client_info: A StreamClientInfo containing various information about - the RPC. - invoker: The handler to complete the RPC on the client. It is the - interceptor's responsibility to call it. - - Returns: - The result from calling invoker(metadata). - """ - raise NotImplementedError() - - -def intercept_channel(channel, *interceptors): - """Creates an intercepted channel. - - Args: - channel: A Channel. - interceptors: Zero or more UnaryClientInterceptors or - StreamClientInterceptors - - Returns: - A Channel. - - Raises: - TypeError: If an interceptor derives from neither UnaryClientInterceptor - nor StreamClientInterceptor. - """ - from . import _interceptor - - return _interceptor.intercept_channel(channel, *interceptors) - - -__all__ = ( - "UnaryClientInterceptor", - "StreamClientInfo", - "StreamClientInterceptor", - "intercept_channel", -) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py deleted file mode 100644 index 53ee46a20d..0000000000 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py +++ /dev/null @@ -1,350 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# pylint:disable=relative-beyond-top-level -# pylint:disable=no-member - -"""Implementation of gRPC Python interceptors.""" - - -import collections - -import grpc - -from opentelemetry.instrumentation.grpc import grpcext - - -class _UnaryClientInfo( - collections.namedtuple("_UnaryClientInfo", ("full_method", "timeout")) -): - pass - - -class _StreamClientInfo( - collections.namedtuple( - "_StreamClientInfo", - ("full_method", "is_client_stream", "is_server_stream", "timeout"), - ) -): - pass - - -class _InterceptorUnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): - def __init__(self, method, base_callable, interceptor): - self._method = method - self._base_callable = base_callable - self._interceptor = interceptor - - def __call__( - self, - request, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request, metadata): - return self._base_callable( - request, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _UnaryClientInfo(self._method, timeout) - return self._interceptor.intercept_unary( - request, metadata, client_info, invoker - ) - - def with_call( - self, - request, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request, metadata): - return self._base_callable.with_call( - request, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _UnaryClientInfo(self._method, timeout) - return self._interceptor.intercept_unary( - request, metadata, client_info, invoker - ) - - def future( - self, - request, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request, metadata): - return self._base_callable.future( - request, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _UnaryClientInfo(self._method, timeout) - return self._interceptor.intercept_unary( - request, metadata, client_info, invoker - ) - - -class _InterceptorUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): - def __init__(self, method, base_callable, interceptor): - self._method = method - self._base_callable = base_callable - self._interceptor = interceptor - - def __call__( - self, - request, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request, metadata): - return self._base_callable( - request, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _StreamClientInfo(self._method, False, True, timeout) - return self._interceptor.intercept_stream( - request, metadata, client_info, invoker - ) - - -class _InterceptorStreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): - def __init__(self, method, base_callable, interceptor): - self._method = method - self._base_callable = base_callable - self._interceptor = interceptor - - def __call__( - self, - request_iterator, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request_iterator, metadata): - return self._base_callable( - request_iterator, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _StreamClientInfo(self._method, True, False, timeout) - return self._interceptor.intercept_stream( - request_iterator, metadata, client_info, invoker - ) - - def with_call( - self, - request_iterator, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request_iterator, metadata): - return self._base_callable.with_call( - request_iterator, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _StreamClientInfo(self._method, True, False, timeout) - return self._interceptor.intercept_stream( - request_iterator, metadata, client_info, invoker - ) - - def future( - self, - request_iterator, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request_iterator, metadata): - return self._base_callable.future( - request_iterator, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _StreamClientInfo(self._method, True, False, timeout) - return self._interceptor.intercept_stream( - request_iterator, metadata, client_info, invoker - ) - - -class _InterceptorStreamStreamMultiCallable(grpc.StreamStreamMultiCallable): - def __init__(self, method, base_callable, interceptor): - self._method = method - self._base_callable = base_callable - self._interceptor = interceptor - - def __call__( - self, - request_iterator, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request_iterator, metadata): - return self._base_callable( - request_iterator, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _StreamClientInfo(self._method, True, True, timeout) - return self._interceptor.intercept_stream( - request_iterator, metadata, client_info, invoker - ) - - -class _InterceptorChannel(grpc.Channel): - def __init__(self, channel, interceptor): - self._channel = channel - self._interceptor = interceptor - - def subscribe(self, *args, **kwargs): - self._channel.subscribe(*args, **kwargs) - - def unsubscribe(self, *args, **kwargs): - self._channel.unsubscribe(*args, **kwargs) - - def unary_unary( - self, method, request_serializer=None, response_deserializer=None - ): - base_callable = self._channel.unary_unary( - method, request_serializer, response_deserializer - ) - if isinstance(self._interceptor, grpcext.UnaryClientInterceptor): - return _InterceptorUnaryUnaryMultiCallable( - method, base_callable, self._interceptor - ) - return base_callable - - def unary_stream( - self, method, request_serializer=None, response_deserializer=None - ): - base_callable = self._channel.unary_stream( - method, request_serializer, response_deserializer - ) - if isinstance(self._interceptor, grpcext.StreamClientInterceptor): - return _InterceptorUnaryStreamMultiCallable( - method, base_callable, self._interceptor - ) - return base_callable - - def stream_unary( - self, method, request_serializer=None, response_deserializer=None - ): - base_callable = self._channel.stream_unary( - method, request_serializer, response_deserializer - ) - if isinstance(self._interceptor, grpcext.StreamClientInterceptor): - return _InterceptorStreamUnaryMultiCallable( - method, base_callable, self._interceptor - ) - return base_callable - - def stream_stream( - self, method, request_serializer=None, response_deserializer=None - ): - base_callable = self._channel.stream_stream( - method, request_serializer, response_deserializer - ) - if isinstance(self._interceptor, grpcext.StreamClientInterceptor): - return _InterceptorStreamStreamMultiCallable( - method, base_callable, self._interceptor - ) - return base_callable - - def close(self): - if not hasattr(self._channel, "close"): - raise RuntimeError( - "close() is not supported with the installed version of grpcio" - ) - self._channel.close() - - def __enter__(self): - """Enters the runtime context related to the channel object.""" - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Exits the runtime context related to the channel object.""" - self.close() - - -def intercept_channel(channel, *interceptors): - result = channel - for interceptor in interceptors: - if not isinstance( - interceptor, grpcext.UnaryClientInterceptor - ) and not isinstance(interceptor, grpcext.StreamClientInterceptor): - raise TypeError( - "interceptor must be either a " - "grpcext.UnaryClientInterceptor or a " - "grpcext.StreamClientInterceptor" - ) - result = _InterceptorChannel(result, interceptor) - return result