Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asyncio support #13

Open
jakedt opened this issue Jan 14, 2021 · 10 comments
Open

Asyncio support #13

jakedt opened this issue Jan 14, 2021 · 10 comments

Comments

@jakedt
Copy link

jakedt commented Jan 14, 2021

Currently the library doesn't support any of the grpc.aio stuff. Would you be interested in that as a contribution? If so, how long until it shows up in a new release?

@qw2208
Copy link

qw2208 commented Jan 19, 2021

Same issue for me. It throws ValueError where Interceptor must be ServerInterceptor.

@bobbymlp
Copy link

+1

I could also really do with this

@lchenn
Copy link
Owner

lchenn commented Mar 24, 2021

I am not really familiar with grpc.aio. Welcome any MR to get it started.

@pyanchesko
Copy link

Hi guys! I resolved this problem in my project. And I want to share this with you. But first I need to fix the formatting of this repository. I created a issue #34. Help me, please @lchenn

@gggrafff
Copy link

gggrafff commented Jan 13, 2023

@pyanchesko
same issue for me
could you share your solution?
thanks

UPD:
I did it
My solution:

"""Interceptor a client call with prometheus"""
import logging

from timeit import default_timer
from typing import Awaitable, Callable

import grpc  # type: ignore
from prometheus_client.registry import REGISTRY

from py_grpc_prometheus import grpc_utils  # type: ignore
from py_grpc_prometheus import server_metrics  # type: ignore


_LOGGER = logging.getLogger(__name__)


# We were forced to write this class because
#   https://github.com/lchenn/py-grpc-prometheus/issues/13
# This file is an almost complete copy of py_grpc_prometheus.PromServerInterceptor
# For information:
#   https://stackoverflow.com/questions/64192211/how-to-convert-grpc-serverinterceptor-to-grcp-aio-serverinterceptor
class PromAioServerInterceptor(grpc.aio.ServerInterceptor):
    def __init__(
        self,
        enable_handling_time_histogram=False,
        legacy=False,
        skip_exceptions=False,
        log_exceptions=True,
        registry=REGISTRY
    ) -> None:
        self._enable_handling_time_histogram = enable_handling_time_histogram
        self._legacy = legacy
        self._grpc_server_handled_total_counter = server_metrics.get_grpc_server_handled_counter(
            self._legacy,
            registry
        )
        self._metrics = server_metrics.init_metrics(registry)
        self._skip_exceptions = skip_exceptions
        self._log_exceptions = log_exceptions

        # This is a constraint of current grpc.StatusCode design
        # https://groups.google.com/g/grpc-io/c/EdIXjMEaOyw/m/d3DeqmrJAAAJ
        self._code_to_status_mapping = {x.value[0]: x for x in grpc.StatusCode}

    async def intercept_service(
        self,
        continuation: Callable[[grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]],
        handler_call_details: grpc.HandlerCallDetails
    ) -> grpc.RpcMethodHandler:
        """
        Intercepts the server function calls.

        This implements referred to:
        https://github.com/census-instrumentation/opencensus-python/blob/master/opencensus/
        trace/ext/grpc/server_interceptor.py
        and
        https://grpc.io/grpc/python/grpc.html#service-side-interceptor
        """

        grpc_service_name, grpc_method_name, _ = grpc_utils.split_method_call(handler_call_details)

        def metrics_wrapper(behavior, request_streaming, response_streaming):
            async def new_behavior(request_or_iterator, servicer_context):
                response_or_iterator = None
                try:
                    start = default_timer()
                    grpc_type = grpc_utils.get_method_type(request_streaming, response_streaming)
                    try:
                        if request_streaming:
                            request_or_iterator = grpc_utils.wrap_iterator_inc_counter(
                                request_or_iterator,
                                self._metrics["grpc_server_stream_msg_received"],
                                grpc_type,
                                grpc_service_name,
                                grpc_method_name
                            )
                        else:
                            self._metrics["grpc_server_started_counter"].labels(
                                grpc_type=grpc_type,
                                grpc_service=grpc_service_name,
                                grpc_method=grpc_method_name
                            ).inc()

                        # Invoke the original rpc behavior.
                        response_or_iterator = await behavior(request_or_iterator, servicer_context)

                        if response_streaming:
                            sent_metric = self._metrics["grpc_server_stream_msg_sent"]
                            response_or_iterator = grpc_utils.wrap_iterator_inc_counter(
                                response_or_iterator,
                                sent_metric,
                                grpc_type,
                                grpc_service_name,
                                grpc_method_name
                            )

                        else:
                            self.increase_grpc_server_handled_total_counter(
                                grpc_type,
                                grpc_service_name,
                                grpc_method_name,
                                self._compute_status_code(servicer_context).name
                            )
                        return response_or_iterator
                    except grpc.RpcError as e:
                        self.increase_grpc_server_handled_total_counter(
                            grpc_type,
                            grpc_service_name,
                            grpc_method_name,
                            self._compute_error_code(e).name
                        )
                        raise e

                    finally:

                        if not response_streaming:
                            if self._legacy:
                                self._metrics["legacy_grpc_server_handled_latency_seconds"].labels(
                                    grpc_type=grpc_type,
                                    grpc_service=grpc_service_name,
                                    grpc_method=grpc_method_name
                                ).observe(max(default_timer() - start, 0))
                            elif self._enable_handling_time_histogram:
                                self._metrics["grpc_server_handled_histogram"].labels(
                                    grpc_type=grpc_type,
                                    grpc_service=grpc_service_name,
                                    grpc_method=grpc_method_name
                                ).observe(max(default_timer() - start, 0))
                except Exception as e:  # pylint: disable=broad-except
                    # Allow user to skip the exceptions in order to maintain
                    # the basic functionality in the server
                    # The logging function in exception can be toggled with log_exceptions
                    # in order to suppress the noise in logging
                    if self._skip_exceptions:
                        if self._log_exceptions:
                            _LOGGER.error(e)
                        if response_or_iterator is None:
                            return response_or_iterator
                        return behavior(request_or_iterator, servicer_context)
                    raise e

            return new_behavior

        handler = await continuation(handler_call_details)
        optional_any = self._wrap_rpc_behavior(handler, metrics_wrapper)

        return optional_any

    def _compute_status_code(self, servicer_context):
        if servicer_context.cancelled():
            return grpc.StatusCode.CANCELLED

        if servicer_context.code() is None:
            return grpc.StatusCode.OK

        return self._code_to_status_mapping[servicer_context.code()]

    def _compute_error_code(self, grpc_exception):
        if isinstance(grpc_exception, grpc.aio.Call):
            return grpc_exception.code()

        return grpc.StatusCode.UNKNOWN

    def increase_grpc_server_handled_total_counter(
            self, grpc_type, grpc_service_name, grpc_method_name, grpc_code):
        if self._legacy:
            self._grpc_server_handled_total_counter.labels(
                grpc_type=grpc_type,
                grpc_service=grpc_service_name,
                grpc_method=grpc_method_name,
                code=grpc_code
            ).inc()
        else:
            self._grpc_server_handled_total_counter.labels(
                grpc_type=grpc_type,
                grpc_service=grpc_service_name,
                grpc_method=grpc_method_name,
                grpc_code=grpc_code
            ).inc()

    def _wrap_rpc_behavior(self, handler, fn):
        """Returns a new rpc handler that wraps the given function"""
        if handler is None:
            return None

        if handler.request_streaming and handler.response_streaming:
            behavior_fn = handler.stream_stream
            handler_factory = grpc.stream_stream_rpc_method_handler
        elif handler.request_streaming and not handler.response_streaming:
            behavior_fn = handler.stream_unary
            handler_factory = grpc.stream_unary_rpc_method_handler
        elif not handler.request_streaming and handler.response_streaming:
            behavior_fn = handler.unary_stream
            handler_factory = grpc.unary_stream_rpc_method_handler
        else:
            behavior_fn = handler.unary_unary
            handler_factory = grpc.unary_unary_rpc_method_handler

        return handler_factory(
            fn(behavior_fn, handler.request_streaming, handler.response_streaming),
            request_deserializer=handler.request_deserializer,
            response_serializer=handler.response_serializer
        )

@lchenn FYI

@nettashafir
Copy link

Hi @pyanchesko and @gggrafff ,

How did you adjusted the Client Interceptor to work with grpc.aio?

I'd be glad if you could share your solutions :-)

@gggrafff
Copy link

@nettashafir

Using of ServerInterceptor:

from prometheus_aio_server_interceptor import PromAioServerInterceptor

server = grpc.aio.server(interceptors=(PromAioServerInterceptor(enable_handling_time_histogram=True),))
actual_port = server.add_insecure_port('[::]:1234')

...

I don't use ClientInterceptor. Describe in more detail what problems do you have with this?

@nettashafir
Copy link

nettashafir commented Apr 10, 2023

Thanks @gggrafff ,
How do I collect the metrics that way?
I added your PromAioServerInterceptor, and my server run nicely. Yet, I try to add also:

prometheus_client.start_http_server(50052)

But I cannot send any GET requests for https://localhost:50052 - I get a ConnectionError

@gggrafff
Copy link

@nettashafir
I have written an example in more details:

import asyncio
import grpc
from grpc_health.v1 import health, health_pb2, health_pb2_grpc
import prometheus_client
from prometheus_aio_server_interceptor import PromAioServerInterceptor

async def serve(
    server: grpc.aio.Server,
) -> None:
    await server.start()
    await server.wait_for_termination()


loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

server = grpc.aio.server(interceptors=(PromAioServerInterceptor(enable_handling_time_histogram=True),))
grpc_port = server.add_insecure_port('[::]:1234')
print('grpc port: ', grpc_port)

health_servicer = health.HealthServicer()
health_servicer.set("instance_id", health_pb2.HealthCheckResponse.SERVING)
health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)

metrics_port = 4321
prometheus_client.start_http_server(metrics_port)
print('metrics port: ', metrics_port)
print(f'Your metrics: http://localhost:{metrics_port}')


loop.run_until_complete(serve(server))

It works for me

@nettashafir
Copy link

nettashafir commented May 14, 2023

I've noticed that the current implementation doesn't work so well for stream response, for in that case the RPC is an iterator.

I edited @gggrafff patch (including some design changes) and now it's working nicely for me:

"""Interceptor a client call with prometheus"""
import logging
from timeit import default_timer
from typing import Awaitable, Callable
import grpc  # type: ignore

from prometheus_client.registry import REGISTRY
from py_grpc_prometheus import grpc_utils  # type: ignore
from py_grpc_prometheus import server_metrics  # type: ignore

logger = logging.getLogger()

LEGACY = True
SKIP_EXCEPTION = False
ENABLE_HANDLING_TIME_HISTOGRAM = True

# This is a constraint of current grpc.StatusCode design: https://groups.google.com/g/grpc-io/c/EdIXjMEaOyw/m/d3DeqmrJAAAJ
_code_to_status_mapping = {x.value[0]: x for x in grpc.StatusCode}

metrics = server_metrics.init_metrics(REGISTRY)
grpc_server_handled_total_counter = server_metrics.get_grpc_server_handled_counter(
    LEGACY,
    REGISTRY,
)


# ----------------------------------- Helpers

async def _wrap_async_iterator_inc_counter(iterator, counter, grpc_type, grpc_service_name, grpc_method_name):
    """Wraps an iterator and collect metrics."""
    async for item in iterator:
        counter.labels(
            grpc_type=grpc_type,
            grpc_service=grpc_service_name,
            grpc_method=grpc_method_name).inc()
        yield item


def _compute_error_code(grpc_exception):
        if isinstance(grpc_exception, grpc.aio.Call):
            return grpc_exception.code()
        return grpc.StatusCode.UNKNOWN


def _compute_status_code(servicer_context):
    if servicer_context.code() is None:
        return grpc.StatusCode.OK
    return _code_to_status_mapping[servicer_context.code()]


def _increase_grpc_server_started_counter(grpc_type, grpc_service_name, grpc_method_name):
    metrics["grpc_server_started_counter"].labels(
            grpc_type=grpc_type,
            grpc_service=grpc_service_name,
            grpc_method=grpc_method_name
        ).inc()


def _increase_grpc_server_handled_total_counter(grpc_type, grpc_service_name, grpc_method_name, grpc_code):
    if LEGACY:
        grpc_server_handled_total_counter.labels(
            grpc_type=grpc_type,
            grpc_service=grpc_service_name,
            grpc_method=grpc_method_name,
            code=grpc_code
        ).inc()
    else:
        grpc_server_handled_total_counter.labels(
            grpc_type=grpc_type,
            grpc_service=grpc_service_name,
            grpc_method=grpc_method_name,
            grpc_code=grpc_code
        ).inc()


def _increase_grpc_server_handled_latency(grpc_type, grpc_service_name, grpc_method_name, start):
    if LEGACY:
        metrics["legacy_grpc_server_handled_latency_seconds"].labels(
            grpc_type=grpc_type,
            grpc_service=grpc_service_name,
            grpc_method=grpc_method_name
        ).observe(max(default_timer() - start, 0))
    elif ENABLE_HANDLING_TIME_HISTOGRAM:
        metrics["grpc_server_handled_histogram"].labels(
            grpc_type=grpc_type,
            grpc_service=grpc_service_name,
            grpc_method=grpc_method_name
        ).observe(max(default_timer() - start, 0))


def _wrap_rpc_behavior(handler, new_behavior_factory, grpc_service_name, grpc_method_name):
    """Returns a new rpc handler that wraps the given function"""
    if handler is None:
        return None

    if handler.request_streaming and handler.response_streaming:
        orig_behavior = handler.stream_stream
        handler_factory = grpc.stream_stream_rpc_method_handler
    elif handler.request_streaming and not handler.response_streaming:
        orig_behavior = handler.stream_unary
        handler_factory = grpc.stream_unary_rpc_method_handler
    elif not handler.request_streaming and handler.response_streaming:
        orig_behavior = handler.unary_stream
        handler_factory = grpc.unary_stream_rpc_method_handler
    else:
        orig_behavior = handler.unary_unary
        handler_factory = grpc.unary_unary_rpc_method_handler

    return handler_factory(
        behavior=new_behavior_factory(orig_behavior,
                                        handler.request_streaming,
                                        handler.response_streaming,
                                        grpc_service_name,
                                        grpc_method_name),
        request_deserializer=handler.request_deserializer,
        response_serializer=handler.response_serializer,
    )


# ----------------------------------- metrics wrapper

def metrics_wrapper(behavior, request_streaming, response_streaming, grpc_service_name, grpc_method_name):
    async def new_unary_behavior(request_or_iterator, servicer_context):
        response = None
        try:
            start = default_timer()
            grpc_type = grpc_utils.get_method_type(request_streaming, response_streaming)
            try:
                _increase_grpc_server_started_counter(grpc_type, grpc_service_name, grpc_method_name)

                # Invoke the original rpc behavior.
                response = await behavior(request_or_iterator, servicer_context)

                _increase_grpc_server_handled_total_counter(
                    grpc_type,
                    grpc_service_name,
                    grpc_method_name,
                    _compute_status_code(servicer_context).name
                )
               
                return response
           
            except grpc.RpcError as exc:
                _increase_grpc_server_handled_total_counter(
                    grpc_type,
                    grpc_service_name,
                    grpc_method_name,
                    _compute_error_code(exc).name
                )
                raise exc

            finally:
                _increase_grpc_server_handled_latency(grpc_type, grpc_service_name, grpc_method_name, start)
       
        except Exception as exc:  # pylint: disable=broad-except
            # Allow user to skip the exceptions in order to maintain
            # the basic functionality in the server
            # in order to suppress the noise in logging
            logger.error(exc)
            if SKIP_EXCEPTION:
                response = await behavior(request_or_iterator, servicer_context)
                return response
            raise exc
   
    async def new_stream_behavior(request_or_iterator, servicer_context):
        iterator = None
        try:
            start = default_timer()
            grpc_type = grpc_utils.get_method_type(request_streaming, response_streaming)
            try:
                _increase_grpc_server_started_counter(grpc_type, grpc_service_name, grpc_method_name)
               
                iterator = _wrap_async_iterator_inc_counter(
                    behavior(request_or_iterator, servicer_context),
                    metrics["grpc_server_stream_msg_sent"],
                    grpc_type,
                    grpc_service_name,
                    grpc_method_name
                )
               
                async for obj in iterator:
                    yield obj
               
                _increase_grpc_server_handled_total_counter(
                    grpc_type,
                    grpc_service_name,
                    grpc_method_name,
                    _compute_status_code(servicer_context).name
                )

            except grpc.RpcError as exc:
                _increase_grpc_server_handled_total_counter(
                    grpc_type,
                    grpc_service_name,
                    grpc_method_name,
                    _compute_error_code(exc).name
                )
                raise exc

            finally:
                _increase_grpc_server_handled_latency(grpc_type, grpc_service_name, grpc_method_name, start)
       
        except Exception as exc:
            logger.error(exc)
            if SKIP_EXCEPTION:
                async for obj in behavior(request_or_iterator, servicer_context):
                    yield obj
            raise exc

    if response_streaming:
        return new_stream_behavior
    return new_unary_behavior


# ----------------------------------- Interceptor

class PromAioServerInterceptor(grpc.aio.ServerInterceptor):
    def __init__(self):
        logger.info("Initializing metric interceptor")

    async def intercept_service(
        self,
        continuation: Callable[[grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]],
        handler_call_details: grpc.HandlerCallDetails
    ) -> grpc.RpcMethodHandler:
        """
        Intercepts the server function calls.
        Only intercepts unary requests.
        """
        grpc_service_name, grpc_method_name, _ = grpc_utils.split_method_call(handler_call_details)
        handler = await continuation(handler_call_details)
        handler = _wrap_rpc_behavior(handler, metrics_wrapper, grpc_service_name, grpc_method_name)
        return handler

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants