-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Outbound flow control bugfix #61
Conversation
This reverts commit 11d7169
requestIter.next(), | ||
requestIter.next(), | ||
requestIter.next().also { requestIter.hasNext() }, | ||
requestIter.next().also { requestIter.hasNext() }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was required after updating to coroutines 1.3.0-RC
. Every call to next()
must be preceded by a call to hasNext()
.consumeAsFlow() | ||
.collect { | ||
responseChannel.send(HelloReply.newBuilder().setMessage("Reply: ${it.name}").build()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mapTo
operator on channel is now deprecated. With out this refactor, the test would fail.
@@ -74,10 +75,10 @@ class ClientStreamingBackPressureTests { | |||
|
|||
@Test | |||
fun `Client send suspends until server invokes receive`() { | |||
lateinit var serverRequestChannel: ReceiveChannel<HelloRequest> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change was to prevent race conditions between the serverRequestChannel
var being populated and test assertions running.
@@ -332,4 +343,58 @@ class ClientCallBidiStreamingTests { | |||
assert(responseChannel.isClosedForReceive) { "Response channel should be closed for receive" } | |||
} | |||
|
|||
@Test | |||
fun `High throughput call succeeds`() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was introduced as part of the discussion the was had with @chris-blacker in #59
@@ -149,7 +160,7 @@ class ClientCallBidiStreamingTests { | |||
requestChannel.close() | |||
} | |||
|
|||
responseChannel.map { it.message }.toList() | |||
responseChannel.consumeAsFlow().map { it.message }.toList() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map operator was deprecated
@@ -55,6 +62,10 @@ class ClientCallBidiStreamingTests { | |||
@[Rule JvmField] | |||
var grpcServerRule = GrpcServerRule().directExecutor() | |||
|
|||
@[Rule JvmField] | |||
public val timeout = CoroutinesTimeout.seconds(30) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows us to debug coroutine deadlocks in the unit tests.
@@ -42,40 +43,51 @@ internal fun <T> CallStreamObserver<*>.applyInboundFlowControl( | |||
} | |||
} | |||
|
|||
private typealias MessageHandler = suspend ActorScope<*>.() -> Unit | |||
|
|||
internal fun <T> CoroutineScope.applyOutboundFlowControl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The outbound flow control handler has been refactored. It no longer spawns multiple jobs when applying backpressure and can properly handle superfluous invocations of the on ready handler runnable.
Looks like the failing test in CI could be unrelated to the code change. Its possible it has something to do with the dependency version bump.
Tests seem consistently stable locally. @chris-blacker Could you verify your use case against this branch? |
} | ||
|
||
val messageHandlerActor = actor<MessageHandler>( | ||
capacity = Channel.UNLIMITED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A limited capacity didn't work? I am wondering if it's possible that the channel becomes a memory leak if jobs are added faster than the worker consumes them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats a good catch. I must've changed it while debugging. I'll test it with the value reverted to CONFLATED
.
This implementation is based off the native grpc util StreamObservers.copyWithFlowControl()
which ensured each invocation of the onReadyHandler always ran. source
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am using CONFLATED
and have no issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good solution. I tested the new flow control in my setup and the issue has been fixed.
capacity = Channel.UNLIMITED, | ||
context = Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> | ||
streamObserver.completeSafely(e) | ||
targetChannel.close(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is missing that the targetChannel
is canceled in some situations. I tried canceling it here but this code was not being executed when I expected.
I have encountered that a client bidi call send hangs when the call is getting canceled in another thread because the channel is not canceled. This happened when the call is canceled while a send was pending in the call rendezvous outboundChannel
. I think the rpc scope then somehow cleans up without canceling the channel, thus the send hangs.
for(handler in channel){ | ||
if(isCompleted.get()) break | ||
handler(this) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding this here reduced the problem but didn't eliminate it. I guess there must be other paths for the scope to cancel.
catch(ex: CancellationException) {
targetChannel.cancel(ex)
throw ex
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the scope could cancellation can also be propagated from its parent under normal normal coroutine usage. This case was covered before because executing new launch on a cancelled scope would take care. Its hard to reproduce but Im trying a few things now.
In regards to the |
} | ||
|
||
@Test | ||
fun `Client cancellation cancels server rpc scope`() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chris-blacker I think I was able to reproduce your hanging client issue with this test. It turns out that the outbound message handler wasnt bound to normal channel cancellation. Thats been fixed in the latest changes. Since the actor wasnt bound, client coroutines would hang since one of their children would never complete.
Codecov Report
@@ Coverage Diff @@
## master #61 +/- ##
============================================
+ Coverage 85.66% 86.09% +0.42%
Complexity 19 19
============================================
Files 15 15
Lines 279 302 +23
Branches 42 48 +6
============================================
+ Hits 239 260 +21
- Misses 13 15 +2
Partials 27 27
Continue to review full report at Codecov.
|
Thanks for the changes. I think they go in the right direction but the hang-on-bidi-call-close issue is not fully resolved for me. I will investigate and give you more information and/or a failing test. |
Using the coroutines debug probes might shed some light as to what is stuck in suspension in your use case |
I tried but I cannot not reproduce the hang-on-bidi-call-close issue outside of one specific stress scenario. And in that scenario it only happens during the shutdown and I can work around it. You can see my attempt here but the test passes fine. |
That’s good to hear. I can work on getting this merged and into a release today. There’s a few spots that need a little clean up first. |
fbb77bd
to
9bfb977
Compare
9bfb977
to
9bcd734
Compare
No description provided.