Skip to content

Commit

Permalink
abstract outbound channel completion handler logic
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoferrer committed Dec 26, 2019
1 parent 63c86ec commit 1427c9f
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,26 @@ internal fun <T> CallStreamObserver<*>.applyInboundFlowControl(

internal typealias MessageHandler = suspend ActorScope<*>.() -> Unit

internal inline fun <T> CoroutineScope.attachOutboundChannelCompletionHandler(
streamObserver: CallStreamObserver<T>,
targetChannel: Channel<T>,
crossinline onSuccess: () -> Unit = {},
crossinline onError: (Throwable) -> Unit = {}
){
launch(start = CoroutineStart.UNDISPATCHED) {
val job = coroutineContext[Job]!!
try {
targetChannel.awaitCloseOrThrow()
onSuccess()
} catch (error: Throwable) {
if(!job.isCancelled){
streamObserver.completeSafely(error, convertError = streamObserver !is ClientCallStreamObserver)
onError(error)
}
}
}
}

internal fun <T> CoroutineScope.applyOutboundFlowControl(
streamObserver: CallStreamObserver<T>,
targetChannel: Channel<T>
Expand Down Expand Up @@ -92,20 +112,6 @@ internal fun <T> CoroutineScope.applyOutboundFlowControl(
capacity = Channel.BUFFERED,
context = Dispatchers.Unconfined
) {

launch(start = CoroutineStart.UNDISPATCHED) {
val job = coroutineContext[Job]!!
try {
targetChannel.awaitCloseOrThrow()
channel.close()
} catch (error: Throwable) {
if(!job.isCancelled){
streamObserver.completeSafely(error, convertError = streamObserver !is ClientCallStreamObserver)
isCompleted.set(true)
}
}
}

try {
for (handler in channel) {
if (isCompleted.get()) break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.github.marcoferrer.krotoplus.coroutines.client
import com.github.marcoferrer.krotoplus.coroutines.call.FlowControlledInboundStreamObserver
import com.github.marcoferrer.krotoplus.coroutines.call.MessageHandler
import com.github.marcoferrer.krotoplus.coroutines.call.applyOutboundFlowControl
import com.github.marcoferrer.krotoplus.coroutines.call.attachOutboundChannelCompletionHandler
import io.grpc.Status
import io.grpc.stub.ClientCallStreamObserver
import io.grpc.stub.ClientResponseObserver
Expand Down Expand Up @@ -109,6 +110,12 @@ internal class ClientBidiCallChannelImpl<ReqT,RespT>(
callStreamObserver = requestStream.apply { disableAutoInboundFlowControl() }
outboundMessageHandler = applyOutboundFlowControl(requestStream,outboundChannel)

attachOutboundChannelCompletionHandler(
callStreamObserver, outboundChannel,
onSuccess = { outboundMessageHandler.close() },
onError = { error -> inboundChannel.close(error) }
)

inboundChannel.invokeOnClose {
// If the client prematurely closes the response channel
// we need to propagate this as a cancellation to the underlying call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.github.marcoferrer.krotoplus.coroutines.client

import com.github.marcoferrer.krotoplus.coroutines.call.MessageHandler
import com.github.marcoferrer.krotoplus.coroutines.call.applyOutboundFlowControl
import com.github.marcoferrer.krotoplus.coroutines.call.attachOutboundChannelCompletionHandler
import io.grpc.stub.ClientCallStreamObserver
import io.grpc.stub.ClientResponseObserver
import kotlinx.coroutines.CancellationException
Expand Down Expand Up @@ -71,6 +72,11 @@ internal class ClientStreamingCallChannelImpl<ReqT,RespT>(
callStreamObserver = requestStream
outboundMessageHandler = applyOutboundFlowControl(requestStream, outboundChannel)

attachOutboundChannelCompletionHandler(
callStreamObserver, outboundChannel,
onSuccess = { outboundMessageHandler.close() },
onError = { error -> completableResponse.completeExceptionally(error) }
)
completableResponse.invokeOnCompletion {
// If the client prematurely cancels the response
// we need to propagate this as a cancellation to the underlying call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.github.marcoferrer.krotoplus.coroutines.server

import com.github.marcoferrer.krotoplus.coroutines.call.applyInboundFlowControl
import com.github.marcoferrer.krotoplus.coroutines.call.applyOutboundFlowControl
import com.github.marcoferrer.krotoplus.coroutines.call.attachOutboundChannelCompletionHandler
import com.github.marcoferrer.krotoplus.coroutines.call.bindToClientCancellation
import com.github.marcoferrer.krotoplus.coroutines.call.completeSafely
import com.github.marcoferrer.krotoplus.coroutines.call.newRpcScope
Expand Down Expand Up @@ -67,6 +68,12 @@ public fun <ReqT, RespT> ServiceScope.serverCallServerStreaming(
with(newRpcScope(initialContext, methodDescriptor)) {
bindToClientCancellation(serverCallObserver)
val outboundMessageHandler = applyOutboundFlowControl(serverCallObserver,responseChannel)

attachOutboundChannelCompletionHandler(
serverCallObserver, responseChannel,
onSuccess = { outboundMessageHandler.close() }
)

launch(start = CoroutineStart.ATOMIC) {
try{
block(responseChannel)
Expand Down Expand Up @@ -165,6 +172,11 @@ public fun <ReqT, RespT> ServiceScope.serverCallBidiStreaming(
}
)

attachOutboundChannelCompletionHandler(
serverCallObserver, responseChannel,
onSuccess = { outboundMessageHandler.close() }
)

launch(start = CoroutineStart.ATOMIC) {
serverCallObserver.request(1)
try {
Expand Down

0 comments on commit 1427c9f

Please sign in to comment.