Skip to content

Commit

Permalink
Add support for service-side grpc.aio
Browse files Browse the repository at this point in the history
This adds support for grpc.aio server interceptors. The vast majority of
the code is either re-used wholesale or duplicated with slight modifications
from the existing standard interceptors.
  • Loading branch information
cookiefission committed Aug 24, 2022
1 parent 7382b5e commit b6753c1
Show file tree
Hide file tree
Showing 4 changed files with 773 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ where = src
opentelemetry_instrumentor =
grpc_client = opentelemetry.instrumentation.grpc:GrpcInstrumentorClient
grpc_server = opentelemetry.instrumentation.grpc:GrpcInstrumentorServer
grpc_aio_server = opentelemetry.instrumentation.grpc:GrpcAioInstrumentorServer
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def serve():
logging.basicConfig()
serve()
You can also add the instrumentor manually, rather than using
You can also add the interceptor manually, rather than using
:py:class:`~opentelemetry.instrumentation.grpc.GrpcInstrumentorServer`:
.. code-block:: python
Expand All @@ -118,6 +118,64 @@ def serve():
server = grpc.server(futures.ThreadPoolExecutor(),
interceptors = [server_interceptor()])
Usage Aio Server
------------
.. code-block:: python
import logging
import asyncio
import grpc
from opentelemetry import trace
from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorServer
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
SimpleSpanProcessor,
)
try:
from .gen import helloworld_pb2, helloworld_pb2_grpc
except ImportError:
from gen import helloworld_pb2, helloworld_pb2_grpc
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(ConsoleSpanExporter())
)
grpc_server_instrumentor = GrpcAioInstrumentorServer()
grpc_server_instrumentor.instrument()
class Greeter(helloworld_pb2_grpc.GreeterServicer):
async def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)
async def serve():
server = grpc.aio.server()
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
logging.basicConfig()
asyncio.run(serve())
You can also add the interceptor manually, rather than using
:py:class:`~opentelemetry.instrumentation.grpc.GrpcAioInstrumentorServer`:
.. code-block:: python
from opentelemetry.instrumentation.grpc import aio_server_interceptor
server = grpc.aio.server(interceptors = [aio_server_interceptor()])
"""
from typing import Collection

Expand Down Expand Up @@ -174,6 +232,44 @@ def _uninstrument(self, **kwargs):
grpc.server = self._original_func


class GrpcAioInstrumentorServer(BaseInstrumentor):
"""
Globally instrument the grpc.aio server.
Usage::
grpc_aio_server_instrumentor = GrpcAioInstrumentorServer()
grpc_aio_server_instrumentor.instrument()
"""

# pylint:disable=attribute-defined-outside-init, redefined-outer-name

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
self._original_func = grpc.aio.server
tracer_provider = kwargs.get("tracer_provider")

def server(*args, **kwargs):
if "interceptors" in kwargs:
# add our interceptor as the first
kwargs["interceptors"].insert(
0, aio_server_interceptor(tracer_provider=tracer_provider)
)
else:
kwargs["interceptors"] = [
aio_server_interceptor(tracer_provider=tracer_provider)
]
return self._original_func(*args, **kwargs)

grpc.aio.server = server

def _uninstrument(self, **kwargs):
grpc.aio.server = self._original_func


class GrpcInstrumentorClient(BaseInstrumentor):
"""
Globally instrument the grpc client
Expand Down Expand Up @@ -255,3 +351,19 @@ def server_interceptor(tracer_provider=None):
tracer = trace.get_tracer(__name__, __version__, tracer_provider)

return _server.OpenTelemetryServerInterceptor(tracer)


def aio_server_interceptor(tracer_provider=None):
"""Create a gRPC aio server interceptor.
Args:
tracer: The tracer to use to create server-side spans.
Returns:
A service-side interceptor object.
"""
from . import _aio_server

tracer = trace.get_tracer(__name__, __version__, tracer_provider)

return _aio_server.OpenTelemetryAioServerInterceptor(tracer)
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc.aio

from ._server import (
OpenTelemetryServerInterceptor,
_wrap_rpc_behavior,
_OpenTelemetryServicerContext,
)


class OpenTelemetryAioServerInterceptor(
grpc.aio.ServerInterceptor, OpenTelemetryServerInterceptor
):
"""
An AsyncIO gRPC server interceptor, to add OpenTelemetry.
Usage::
tracer = some OpenTelemetry tracer
interceptors = [
AsyncOpenTelemetryServerInterceptor(tracer),
]
server = aio.server(
futures.ThreadPoolExecutor(max_workers=concurrency),
interceptors = (interceptors,))
"""

async def intercept_service(self, continuation, handler_call_details):
def telemetry_wrapper(behavior, request_streaming, response_streaming):
# handle streaming responses specially
if response_streaming:
return self._intercept_server_stream(
behavior,
handler_call_details,
)
else:
return self._intercept_server_unary(
behavior,
handler_call_details,
)

next_handler = await continuation(handler_call_details)

return _wrap_rpc_behavior(next_handler, telemetry_wrapper)

def _intercept_server_unary(self, behavior, handler_call_details):
async def _unary_interceptor(request_or_iterator, context):
with self._set_remote_context(context):
with self._start_span(
handler_call_details,
context,
set_status_on_exception=False,
) as span:
# wrap the context
context = _OpenTelemetryServicerContext(context, span)

# And now we run the actual RPC.
try:
return await behavior(request_or_iterator, context)

except Exception as error:
# Bare exceptions are likely to be gRPC aborts, which
# we handle in our context wrapper.
# Here, we're interested in uncaught exceptions.
# pylint:disable=unidiomatic-typecheck
if type(error) != Exception:
span.record_exception(error)
raise error

return _unary_interceptor

def _intercept_server_stream(self, behavior, handler_call_details):
async def _stream_interceptor(request_or_iterator, context):
with self._set_remote_context(context):
with self._start_span(
handler_call_details,
context,
set_status_on_exception=False,
) as span:
context = _OpenTelemetryServicerContext(context, span)

try:
async for response in behavior(
request_or_iterator, context
):
yield response

except Exception as error:
# pylint:disable=unidiomatic-typecheck
if type(error) != Exception:
span.record_exception(error)
raise error

return _stream_interceptor
Loading

0 comments on commit b6753c1

Please sign in to comment.