From d99af229322dab083e77dca1b25b4f6f7ebc23ff Mon Sep 17 00:00:00 2001 From: Marco Ferrer <35935108+marcoferrer@users.noreply.github.com> Date: Sun, 11 Aug 2019 12:31:49 -0400 Subject: [PATCH] propagate scope cancellation to target channel in outbound flow control --- .../krotoplus/coroutines/call/FlowControl.kt | 10 ++++++++++ .../coroutines/client/ClientBidiCallChannel.kt | 3 ++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/kroto-plus-coroutines/src/main/kotlin/com/github/marcoferrer/krotoplus/coroutines/call/FlowControl.kt b/kroto-plus-coroutines/src/main/kotlin/com/github/marcoferrer/krotoplus/coroutines/call/FlowControl.kt index dd2b47a..188d724 100644 --- a/kroto-plus-coroutines/src/main/kotlin/com/github/marcoferrer/krotoplus/coroutines/call/FlowControl.kt +++ b/kroto-plus-coroutines/src/main/kotlin/com/github/marcoferrer/krotoplus/coroutines/call/FlowControl.kt @@ -17,9 +17,11 @@ package com.github.marcoferrer.krotoplus.coroutines.call import io.grpc.stub.CallStreamObserver +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineExceptionHandler import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job import kotlinx.coroutines.channels.ActorScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.actor @@ -72,6 +74,14 @@ internal fun CoroutineScope.applyOutboundFlowControl( targetChannel.close(e) } ) { + coroutineContext[Job]?.let { job -> + job.invokeOnCompletion { + if(job.isCancelled && !targetChannel.isClosedForSend){ + targetChannel.cancel(it as? CancellationException) + } + } + } + for(handler in channel){ if(isCompleted.get()) break handler(this) diff --git a/kroto-plus-coroutines/src/main/kotlin/com/github/marcoferrer/krotoplus/coroutines/client/ClientBidiCallChannel.kt b/kroto-plus-coroutines/src/main/kotlin/com/github/marcoferrer/krotoplus/coroutines/client/ClientBidiCallChannel.kt index 76d00f1..0d54491 100644 --- a/kroto-plus-coroutines/src/main/kotlin/com/github/marcoferrer/krotoplus/coroutines/client/ClientBidiCallChannel.kt +++ b/kroto-plus-coroutines/src/main/kotlin/com/github/marcoferrer/krotoplus/coroutines/client/ClientBidiCallChannel.kt @@ -20,6 +20,7 @@ import com.github.marcoferrer.krotoplus.coroutines.call.FlowControlledInboundStr import com.github.marcoferrer.krotoplus.coroutines.call.applyOutboundFlowControl import io.grpc.stub.ClientCallStreamObserver import io.grpc.stub.ClientResponseObserver +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel @@ -106,7 +107,7 @@ internal class ClientBidiCallChannelImpl( override fun onError(t: Throwable) { outboundChannel.close(t) - outboundChannel.cancel() + outboundChannel.cancel(CancellationException(t.message,t)) inboundChannel.close(t) } }