Skip to content

Commit

Permalink
Merge branch 'master' into grpc_metrics_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alrex authored Jul 23, 2020
2 parents 20e0a2b + 5ff9600 commit bc85b1f
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 50 deletions.
2 changes: 2 additions & 0 deletions ext/opentelemetry-ext-grpc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- Add status code to gRPC client spans
([896](https://github.com/open-telemetry/opentelemetry-python/pull/896))
- Add gRPC client and server instrumentors
([788](https://github.com/open-telemetry/opentelemetry-python/pull/788))

- Add metric recording (bytes in/out, errors, latency) to gRPC client

Expand Down
5 changes: 5 additions & 0 deletions ext/opentelemetry-ext-grpc/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,8 @@ test =

[options.packages.find]
where = src

[options.entry_points]
opentelemetry_instrumentor =
grpc_client = opentelemetry.ext.grpc:GrpcInstrumentorClient
grpc_server = opentelemetry.ext.grpc:GrpcInstrumentorServer
141 changes: 139 additions & 2 deletions ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,150 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint:disable=import-outside-toplevel
# pylint:disable=import-self
# pylint:disable=no-name-in-module
# pylint:disable=relative-beyond-top-level
# pylint:disable=import-error
# pylint:disable=no-self-use
"""
Usage Client
------------
.. code-block:: python
import logging
import grpc
from opentelemetry import trace
from opentelemetry.ext.grpc import GrpcInstrumentorClient, client_interceptor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
SimpleExportSpanProcessor,
)
try:
from .gen import helloworld_pb2, helloworld_pb2_grpc
except ImportError:
from gen import helloworld_pb2, helloworld_pb2_grpc
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleExportSpanProcessor(ConsoleSpanExporter())
)
instrumentor = GrpcInstrumentorClient()
instrumentor.instrument()
def run():
with grpc.insecure_channel("localhost:50051") as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(helloworld_pb2.HelloRequest(name="YOU"))
print("Greeter client received: " + response.message)
if __name__ == "__main__":
logging.basicConfig()
run()
Usage Server
------------
.. code-block:: python
import logging
from concurrent import futures
import grpc
from opentelemetry import trace
from opentelemetry.ext.grpc import GrpcInstrumentorServer, server_interceptor
from opentelemetry.ext.grpc.grpcext import intercept_server
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
SimpleExportSpanProcessor,
)
try:
from .gen import helloworld_pb2, helloworld_pb2_grpc
except ImportError:
from gen import helloworld_pb2, helloworld_pb2_grpc
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleExportSpanProcessor(ConsoleSpanExporter())
)
grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()
class Greeter(helloworld_pb2_grpc.GreeterServicer):
def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)
def serve():
server = grpc.server(futures.ThreadPoolExecutor())
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
logging.basicConfig()
serve()
"""
from contextlib import contextmanager

import grpc
from wrapt import wrap_function_wrapper as _wrap

from opentelemetry import trace
from opentelemetry.ext.grpc.grpcext import intercept_channel, intercept_server
from opentelemetry.ext.grpc.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap

# pylint:disable=import-outside-toplevel
# pylint:disable=import-self
# pylint:disable=unused-argument
# isort:skip


class GrpcInstrumentorServer(BaseInstrumentor):
def _instrument(self, **kwargs):
_wrap("grpc", "server", self.wrapper_fn)

def _uninstrument(self, **kwargs):
unwrap(grpc, "server")

def wrapper_fn(self, original_func, instance, args, kwargs):
server = original_func(*args, **kwargs)
return intercept_server(server, server_interceptor())


class GrpcInstrumentorClient(BaseInstrumentor):
def _instrument(self, **kwargs):
if kwargs.get("channel_type") == "secure":
_wrap("grpc", "secure_channel", self.wrapper_fn)

else:
_wrap("grpc", "insecure_channel", self.wrapper_fn)

def _uninstrument(self, **kwargs):
if kwargs.get("channel_type") == "secure":
unwrap(grpc, "secure_channel")

else:
unwrap(grpc, "insecure_channel")

@contextmanager
def wrapper_fn(self, original_func, instance, args, kwargs):
with original_func(*args, **kwargs) as channel:
yield intercept_channel(channel, client_interceptor())


def client_interceptor(tracer_provider=None, exporter=None, interval=30):
Expand Down
58 changes: 57 additions & 1 deletion ext/opentelemetry-ext-grpc/tests/test_server_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import opentelemetry.ext.grpc
from opentelemetry import trace
from opentelemetry.ext.grpc import server_interceptor
from opentelemetry.ext.grpc import GrpcInstrumentorServer, server_interceptor
from opentelemetry.ext.grpc.grpcext import intercept_server
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.test.test_base import TestBase
Expand All @@ -49,6 +49,62 @@ def service(self, handler_call_details):


class TestOpenTelemetryServerInterceptor(TestBase):
def test_instrumentor(self):
def handler(request, context):
return b""

grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
)

server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))

port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

try:
server.start()
channel.unary_unary("test")(b"test")
finally:
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "test")
self.assertIs(span.kind, trace.SpanKind.SERVER)
self.check_span_instrumentation_info(span, opentelemetry.ext.grpc)
grpc_server_instrumentor.uninstrument()

def test_uninstrument(self):
def handler(request, context):
return b""

grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()
grpc_server_instrumentor.uninstrument()
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
)

server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))

port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

try:
server.start()
channel.unary_unary("test")(b"test")
finally:
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 0)

def test_create_span(self):
"""Check that the interceptor wraps calls with spans server-side."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,18 @@
import opentracing
from deprecated import deprecated

import opentelemetry.trace as trace_api
from opentelemetry import propagators
from opentelemetry.context import Context
from opentelemetry.correlationcontext import get_correlation, set_correlation
from opentelemetry.ext.opentracing_shim import util
from opentelemetry.ext.opentracing_shim.version import __version__
from opentelemetry.trace import DefaultSpan, set_span_in_context
from opentelemetry.trace import (
INVALID_SPAN_CONTEXT,
DefaultSpan,
Link,
get_current_span,
set_span_in_context,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -130,6 +137,8 @@ class SpanContextShim(opentracing.SpanContext):

def __init__(self, otel_context):
self._otel_context = otel_context
# Context is being used here since it must be immutable.
self._baggage = Context()

def unwrap(self):
"""Returns the wrapped :class:`opentelemetry.trace.SpanContext`
Expand All @@ -144,17 +153,9 @@ def unwrap(self):

@property
def baggage(self):
"""Implements the ``baggage`` property from the base class.
"""Implements the ``baggage`` property from the base class."""

Warning:
Not implemented yet.
"""

logger.warning(
"Using unimplemented property baggage on class %s.",
self.__class__.__name__,
)
# TODO: Implement.
return self._baggage


class SpanShim(opentracing.Span):
Expand Down Expand Up @@ -270,31 +271,17 @@ def log(self, **kwargs):
def log_event(self, event, payload=None):
super().log_event(event, payload=payload)

def set_baggage_item(self, key, value): # pylint:disable=unused-argument
"""Implements the ``set_baggage_item()`` method from the base class.
Warning:
Not implemented yet.
"""

logger.warning(
"Calling unimplemented method set_baggage_item() on class %s",
self.__class__.__name__,
def set_baggage_item(self, key, value):
"""Implements the ``set_baggage_item`` method from the base class."""
# pylint: disable=protected-access
self._context._baggage = set_correlation(
key, value, context=self._context._baggage
)
# TODO: Implement.

def get_baggage_item(self, key): # pylint:disable=unused-argument
"""Implements the ``get_baggage_item()`` method from the base class.
Warning:
Not implemented yet.
"""

logger.warning(
"Calling unimplemented method get_baggage_item() on class %s",
self.__class__.__name__,
)
# TODO: Implement.
def get_baggage_item(self, key):
"""Implements the ``get_baggage_item`` method from the base class."""
# pylint: disable=protected-access
return get_correlation(key, context=self._context._baggage)


class ScopeShim(opentracing.Scope):
Expand Down Expand Up @@ -469,8 +456,8 @@ def active(self):
shim and is likely to be handled in future versions.
"""

span = trace_api.get_current_span()
if span.get_context() == trace_api.INVALID_SPAN_CONTEXT:
span = get_current_span()
if span.get_context() == INVALID_SPAN_CONTEXT:
return None

span_context = SpanContextShim(span.get_context())
Expand Down Expand Up @@ -643,7 +630,7 @@ def start_span(
links = []
if references:
for ref in references:
links.append(trace_api.Link(ref.referenced_context.unwrap()))
links.append(Link(ref.referenced_context.unwrap()))

# The OpenTracing API expects time values to be `float` values which
# represent the number of seconds since the epoch. OpenTelemetry
Expand Down Expand Up @@ -699,10 +686,10 @@ def get_as_list(dict_object, key):

propagator = propagators.get_global_httptextformat()
ctx = propagator.extract(get_as_list, carrier)
span = trace_api.get_current_span(ctx)
span = get_current_span(ctx)
if span is not None:
otel_context = span.get_context()
else:
otel_context = trace_api.INVALID_SPAN_CONTEXT
otel_context = INVALID_SPAN_CONTEXT

return SpanContextShim(otel_context)
Loading

0 comments on commit bc85b1f

Please sign in to comment.