Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: work with grpc-java team to provide binding layer and code generator for grpc-java #360

Closed
hsyed opened this issue May 15, 2018 · 12 comments

Comments

@hsyed
Copy link

hsyed commented May 15, 2018

Grpc-java is a low-level api that is quite error prone to use without significant investment in code-generation and libraries encoding relevant concepts --e.g., (pagination, batching, etc.

The go use-case for grpc is much easier to get started with for applications largely due to channels and context propagation being part of the stub generation. Kotlin Coroutines can provide an even better experience out of the box, completely eliminating the need for Streaming libraries or a lot of the utilities provided by a library like gax --e.g., Batching and Paging would be much easier to codify on a per application bases.

At the moment we are prototyping with a glue layer that connects grpc-java concepts per request. The bidi stream use case in this context would code generate a method that creates a job that does:

  1. Glue a Channel to the input StreamObserver
  2. Glue LinkedListChannel to the out StreamObserver.
  3. Bind Grpc context to Coroutine Context or create a dispatcher using Context::fixedContextExecutor.
  4. Propogate Context cancelation to the Job.

This is quite heavy handed. Ideally we should have a fully kotlinx.coroutine based layer perhaps directly above Netty that sidesteps the need to use the grpc-java layer and it would be fully coroutine based from the ground up.

@elizarov
Copy link
Contributor

@hsyed Seem like a great idea. Do you have any prototype to look at?

@hsyed
Copy link
Author

hsyed commented Jun 26, 2018

@elizarov our mono repo is built using Bazel I will try and share the code if I can.

I based the code generator in our example fromhere. I've made quite some changes to the templates and what they generate but the principal hasn't diverged too far from it. What is good about this generator is that the generated stubs are compatible with grpc-java. This code generator uses mustache templates and an additional external dep from salesforce to generate both of these deps aren't strictly needed.

There is another code generator here -- this generates various utility code. It uses "kotlin poet" from square and uses the a proto file parser from square/wire to parse proto files -- kotlin poet is very useful but the proto file parser isn't strictly needed.


Here is some example code to see how our generators glue with grpc-java. The binding classes our generators use. Note the generator is different to the open source generator I linked above.

Server Stub: Server Streaming

        internal fun nextInternal(responseObserver: StreamObserver<com.axsy.cloud.proto.core.QueueOpList>): StreamObserver<com.axsy.cloud.proto.core.QueueNextBatchRequest> {
            val requestChannel = StreamObserverChannel<com.axsy.cloud.proto.core.QueueNextBatchRequest>()
           // inline methods are explicitly not used to give more context in logging calls. Grpc Java
           // as most of Google's java stack doesn't waste stack frames on noise. Grpc services aren't 
           // always simple restful transactions --e.g., etcd3. So concise stack frames and accurate
           //  logging context is very useful -- coroutines generate a lot of noise in  the stack frames
           // we filter these stack frames out in logback.
            val job = launch(dispatcher) {
                try {
                    val response = next(coroutineContext, requestChannel)
                    for (value in response) {
                        responseObserver.onNext(value)
                    }
                    responseObserver.onCompleted()
                } catch (t: Throwable) {
                    responseObserver.onError(t)
                }
            }
            return requestChannel
        }

There are a few improvements to be made,

  1. The job should be automatically canceled if a cancelation is received by the client.
  2. Ensure the dispatcher always has the grpc-java context primed see.

Client Stub: ManyToMany

        fun next(context: CoroutineContext)
        : ManyToManyCall<com.axsy.cloud.proto.core.QueueNextBatchRequest, com.axsy.cloud.proto.core.QueueOpList> {
            val responseChannel = StreamObserverChannel<com.axsy.cloud.proto.core.QueueOpList>()
            val requestObserver = delegate.next(responseChannel)
            val requestChannel = LinkedListChannel<com.axsy.cloud.proto.core.QueueNextBatchRequest>()
            KtStubs.connectChannelToObserver(context, requestChannel, requestObserver)
            return ManyToManyCall(requestChannel, responseChannel)
        }

My thoughts for an end-to-end grpc-kotlin integration encompass the following areas.

  1. There is a low level glue layer that is needed to glue grpc-java metaphors to the metaphors in Kotlinx-coroutines. Beyond the ones that I mentioned in my original post there is also interceptor support, there is quite a bit of boilerplate in writing bidirectional interceptors in Java.
  2. A library capturing metaphors for building production apis --e.g., GAX. Kotlinx coroutine concepts can cover a lot of what is in GAX or make it easy to roll as needed.
  3. Code generators for client and server stubs.
  4. Utility for bootstrapping servers and clients. This could range from sugar for grpc-java or perhaps a ktor integration. A ktor integration could cover interceptors and some of 2.

@asad-awadia
Copy link

Really looking forward to this officially becoming a part of Kotlinx.coroutines

@tianhao-au
Copy link

Sounds quite promising. If we can fully take advantage of Kotlin coroutines in gRPC-java, that would simplify the code quite a lot.

@phiSgr
Copy link

phiSgr commented Aug 7, 2018

It would be great if backpressure is supported in the form of suspending sends.

I once created an extra buffer of messages in addition to the unconfigurable 32kB buffer. It is not very fun having to check isReady and use setOnReadyHandler.

@marcoferrer
Copy link

Just stumbled upon this and Im really interested in contributing to the initial prototype for this.

I authored one the code generators mentioned by @hsyed (kroto+). Currently the code generator has dropped its dependency on square/wire and has been refactored into a protoc plugin, but I still feel an official solution would be beneficial to the community. Hopefully reducing the fragmented approaches being provided by the community.

Is a process for documenting proposals for coroutines, similar to Kotlins KEEP?

If not I can continue working on my initial implementation in preparation for feedback from the community.

Also due to the nature of this project, Im not entirely sure where the protoc plugin module should live.

I think the integrations directory would be an ill fit. Should it exist in a repo of its own? The generated code will depend on an artifact of its own which I could imaging living in integrations.

@elizarov
Copy link
Contributor

@marcoferrer You are right, it should be a separate module in integrations directory. You can start it a separate project, though, and later contribute it to kotlinx.coroutines. Whatever works best for you.

@marcoferrer
Copy link

marcoferrer commented Dec 18, 2018

I know its been a while but wanted to give a status update. The PR for the initial implementation has been opened under the kroto-plus repo marcoferrer/kroto-plus#16 and contains all of the code generation and updated apis, along with an example project and snapshot for users to try out.

The goal is to release under kroto initially and iterate on the feedback from the users of that project. The plan afterwards will be to port the apis and code gen to this repo.

Feedback is welcomed and encouraged.

Here are some key points taken from the PR

  1. All around better integration with Structured Concurrency
  2. Backpressure Support has been implemented.
  3. Generation of Client Stubs which implement the CoroutineScope interface
    • Allows client stubs to work well with Structured Concurrency
    • Cancellations can now be propagated across usages of a specific stub instance.
  4. Generation of abstract Service Base Impl's
    • Allows services to still support common grpc-java patterns, while still fully embracing coroutine idioms and features.

@ghost
Copy link

ghost commented Jan 16, 2019

Hi Marco,

2 days ago I started with gRPC and wanted to integrate it into Kotlin with some nice coroutines. Luckily I found your awesome work. This really begs to be included into coroutine integrations. All in all, I thought, somebody must have thought about all this already, here we are.

The idea of having the code automatically generated is very nice, but in my case, adding an extra step to the the build is more cumbersome. Why not including also this option in the proposal.

Client API:

suspend fun <ReqT, RespT> callUnaryRpc(call: (ReqT, StreamObserver<RespT>) -> Unit, request: ReqT): RespT {
    return ClientCalls.suspendingUnaryCall { responseObserver: StreamObserver<RespT> ->
        call(request, responseObserver)
    }
}

fun <ReqT, RespT> callServerStreamingRpc(call: (ReqT, StreamObserver<RespT>) -> Unit, request: ReqT): ReceiveChannel<RespT> {
    return ClientCalls.serverStreamingCall { responseObserver: StreamObserver<RespT> ->
        call(request, responseObserver)
    }
}

fun <ReqT, RespT> callClientStreamingRpc(call: (StreamObserver<RespT>) -> StreamObserver<ReqT>): ClientStreamingCallChannel<ReqT, RespT> {
    return ClientCalls.clientStreamingCall(Dispatchers.Default, call)
}

fun <ReqT, RespT> callBidiStreamingRpc(call: (StreamObserver<RespT>) -> StreamObserver<ReqT>): ClientBidiCallChannel<ReqT, RespT> {
    return ClientCalls.bidiCall(Dispatchers.Default, call)
}

Usage:

val stub = HeroServiceGrpc.newStub(channel)

//Unary RCP, request response pattern
val unaryResponse = callUnaryRpc(stub::getHeroByNameUnary, GetHeroByNameRequest.getDefaultInstance())

//Client calls once, Server streams
val serverStreamingResponse = callServerStreamingRpc(stub::getSidekicksByHero, Hero.getDefaultInstance())
serverStreamingResponse.consumeEach { println(it) }

//Client streams, Server responds once
val serverResponse = callClientStreamingRpc(stub::getHerosByName)

//Client streams, Server streams as well
val (sendingStream, responseStream) = callBidiStreamingRpc(stub::getSidekicksByHeros)
repeat(5) {
    sendingStream.send(Hero.getDefaultInstance())
}
responseStream.consumeEach { println(it) }

@ghost
Copy link

ghost commented Jan 17, 2019

Same for messages, with no code generation:

inline fun <reified M : Message, reified B : Message.Builder> createMessage(body:B.()-> Unit):M {
    return B::class.java.newInstance().apply(body).build() as M
}

Usage:

createMessage<GetHeroByNameRequest, GetHeroByNameRequest.Builder> {
    name = "pippo"
}

@Abegemot
Copy link

Hi Mr coder82, I'm totally amazed by your approach, it is a splendid example of Kotlin, protobuf, and coroutines.
I was utterly annoyed by the protobuf stream observers.
But I still can't make your example work, I can't find where does ClientCalls.suspendingUnaryCall, ClientCalls.serverStreamingCall and the two others came from . And for the message generator I don't see where Message comes from.
But still if I ever achieve to make it work it is exactly what I was looking for.
I too consider that having a standard to deal with protobuf is a very nice addon for kotlin, specially if you can take advantage of all the coroutines stuff.
Thank you

@elizarov
Copy link
Contributor

elizarov commented May 7, 2020

Closing this one. See https://github.com/grpc/grpc-kotlin

@elizarov elizarov closed this as completed May 7, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants