Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

intercept_stream_stream in grpc.aio incorrectly marked as a coroutine #47

Open
arvidfm opened this issue Jun 12, 2023 · 2 comments
Open

Comments

@arvidfm
Copy link

arvidfm commented Jun 12, 2023

Description of issue

grpc.aio.StreamStreamClientInterceptor.intercept_stream_stream is marked as async in grpc-stubs:

https://github.com/shabbyrobe/grpc-stubs/blob/master/grpc-stubs/aio/__init__.pyi#L351

But grpc.aio (as of version 1.54.2) actually fails if you attempt to pass an interceptor with an async implementation.

The same likely applies to the equivalent methods in the other interceptor types.

Minimum Reproducible Example

The following passes strict type checking with mypy, but fails at runtime. Remove async from the intercept_stream_stream function signature to make it work.

main.py
import asyncio
from collections.abc import AsyncIterator, Callable
from typing import TYPE_CHECKING, cast, Union, AsyncIterable, Iterable

import grpc.aio
from grpc.aio import ClientCallDetails, StreamStreamCall

import pow_pb2
import pow_pb2_grpc


if TYPE_CHECKING:
    BaseInterceptor = grpc.aio.StreamStreamClientInterceptor[pow_pb2.Message, pow_pb2.Message]
else:
    BaseInterceptor = grpc.aio.StreamStreamClientInterceptor


class MyInterceptor(BaseInterceptor):
    async def intercept_stream_stream(
        self,
        continuation: Callable[
            [ClientCallDetails, pow_pb2.Message],
            StreamStreamCall[pow_pb2.Message, pow_pb2.Message],
        ],
        client_call_details: ClientCallDetails,
        request_iterator: Union[AsyncIterable[pow_pb2.Message], Iterable[pow_pb2.Message]],
    ) -> Union[AsyncIterator[pow_pb2.Message], StreamStreamCall[pow_pb2.Message, pow_pb2.Message]]:
        print(client_call_details.method)
        # https://github.com/shabbyrobe/grpc-stubs/issues/46
        return continuation(client_call_details, request_iterator)  # type: ignore[arg-type]


async def streamer() -> AsyncIterator[pow_pb2.Message]:
    for i in range(10):
        yield pow_pb2.Message(Number=i)


class PowerServicer(pow_pb2_grpc.PowerServicer):
    async def Pow(
        self,
        request_iterator: AsyncIterable[pow_pb2.Message],
        context: grpc.aio.ServicerContext[pow_pb2.Message, pow_pb2.Message],
    ) -> AsyncIterator[pow_pb2.Message]:
        async for i in request_iterator:
            yield pow_pb2.Message(Number=i.Number**2)


async def start_server() -> None:
    server = grpc.aio.server()
    pow_pb2_grpc.add_PowerServicer_to_server(PowerServicer(), server)
    server.add_insecure_port("[::]:50051")
    await server.start()
    await server.wait_for_termination()


async def call_server() -> None:
    channel = grpc.aio.insecure_channel(
        "localhost:50051",
        interceptors=[cast(grpc.aio.ClientInterceptor, MyInterceptor())],
    )
    stub = pow_pb2_grpc.PowerStub(channel)
    if TYPE_CHECKING:
        async_stub = cast(pow_pb2_grpc.PowerAsyncStub, stub)
    else:
        async_stub = stub
    result: pow_pb2.Message
    async for result in async_stub.Pow(streamer()):
        print(result.Number)


async def main() -> None:
    asyncio.create_task(start_server())
    await asyncio.sleep(1)
    await call_server()


if __name__ == "__main__":
    asyncio.run(main())
pow.proto
syntax = "proto3";

service Power {
  rpc Pow(stream Message) returns (stream Message) {};
}

message Message {
  float Number = 1;
}
run.sh
#!/usr/bin/env bash

set -o errexit -o nounset -o pipefail
python -m venv venv
source ./venv/bin/activate
pip install grpcio==1.54.2 grpcio-tools==1.54.2 mypy==1.3.0 git+https://github.com/shabbyrobe/grpc-stubs.git git+https://github.com/nipunn1313/mypy-protobuf.git

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. --mypy_out=. --mypy_grpc_out=. pow.proto
python main.py
@shabbyrobe
Copy link
Owner

Thank you for supplying an MRE. I was able to reproduce a failure locally with your example. Both the documentation and the source mark these methods as async though... I'm not really sure why.

https://grpc.github.io/grpc/python/grpc_asyncio.html#grpc.aio.StreamStreamClientInterceptor.intercept_stream_stream
https://grpc.github.io/grpc/python/_modules/grpc/aio/_interceptor.html#StreamStreamClientInterceptor.intercept_stream_stream

My apologies, but, I'm too far removed from both async python and gRPC at the moment to be able to adequately respond to this issue with the info I have, and given I've had quite a lot of trouble with contributions to these aio typings so far, I think I'll need a little more convincing about the right course of action here.

@macro1
Copy link

macro1 commented Apr 12, 2024

hi @arvidfm. i think you're missing an await in your intercept_stream_stream()

maybe something more like this:

class MyInterceptor(BaseInterceptor):

    async def intercept_stream_stream(
        self,
        continuation: Callable[  # type: ignore[override]
            [ClientCallDetails, AsyncIterator[pow_pb2.Message]],
            Awaitable[StreamStreamCall[pow_pb2.Message, pow_pb2.Message]],
        ],
        client_call_details: ClientCallDetails,
        request_iterator: AsyncIterator[pow_pb2.Message],  # type: ignore[override]
    ) -> Union[
        AsyncIterator[pow_pb2.Message],
        StreamStreamCall[pow_pb2.Message, pow_pb2.Message],
    ]:
        return await continuation(client_call_details, request_iterator)

which is similar to how code is written in grpc's tests:
https://github.com/grpc/grpc/blob/3fe06af9a305ff6df97e9b936a9a94fc4350526c/src/python/grpcio_tests/tests_aio/unit/client_stream_stream_interceptor_test.py#L36

i'm not sure about the iterable vs iterator stuff.. clearly streams would be iterators, as the messages are yielded. seems like a different issue than you're reporting though

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants