Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outbound flow control bugfix #61

Merged
merged 25 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
125e6a3
update dependencies
marcoferrer Aug 4, 2019
757da7f
fix typo in config message name (#57)
marcoferrer Aug 4, 2019
11d7169
clean up usages of deprecated apis
marcoferrer Aug 4, 2019
bed8d00
fix race condition in unit back-pressure test
marcoferrer Aug 4, 2019
fec2f7d
add test from @chris-blacker for high volume race condition
marcoferrer Aug 4, 2019
5df774a
Revert "clean up usages of deprecated apis"
marcoferrer Aug 4, 2019
6b7fdea
fix race condition in outbound flow control handler
marcoferrer Aug 4, 2019
7621579
update outbound flow control
marcoferrer Aug 5, 2019
afd315e
add coroutine timeout rule to unit tests
marcoferrer Aug 5, 2019
42a9460
debug ci issues
marcoferrer Aug 5, 2019
4da8aaf
downgrade grpc version
marcoferrer Aug 5, 2019
d99af22
propagate scope cancellation to target channel in outbound flow control
marcoferrer Aug 11, 2019
bdba585
configure detailed test output
marcoferrer Aug 12, 2019
11b91e3
disable coroutines debug artifact
marcoferrer Aug 12, 2019
18aac4f
update dependency versions
marcoferrer Aug 12, 2019
8e9bd7c
add bidi streaming integration tests
marcoferrer Aug 12, 2019
b4bd070
propagate client inbound channel close as call cancellation
marcoferrer Aug 12, 2019
59715c5
close outbound message handler in server call handlers
marcoferrer Aug 12, 2019
5f326a3
improve determinism of tests
marcoferrer Aug 12, 2019
4a667e7
Update build.gradle
marcoferrer Aug 12, 2019
7760fcd
Update CallOptionsTest.kt
marcoferrer Aug 12, 2019
c76b00b
add withContext to atomic server bidi streaming test
marcoferrer Aug 13, 2019
19c0ce4
fix flaky cancellation message checking
marcoferrer Aug 13, 2019
9bcd734
update tests
marcoferrer Aug 22, 2019
4375f58
update tests
marcoferrer Aug 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
## Version 0.4.1
## Version 0.5.0-RC
_\*\*-\*\*_
* New: Update to Kotlin `1.3.40`
* New: Update to Kotlin Coroutines `1.2.2`
* New: Update to gRPC `1.21.0`
* New: Update to protobuf `3.7.1`
* New: Update to Kotlin `1.3.41`
* New: Update to Kotlin Coroutines `1.3.0-RC`
* New: Update to gRPC `1.22.1`
* New: Update to protobuf `3.9.0`



Expand Down
46 changes: 46 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,50 @@ subprojects{ subproject ->
testImplementation "org.jetbrains.kotlin:kotlin-test"
testImplementation "org.jetbrains.kotlin:kotlin-test-junit"
}


tasks.withType(Test) {
testLogging {
// set options for log level LIFECYCLE
events (
'FAILED',
'PASSED',
'SKIPPED',
'STANDARD_OUT'
)
exceptionFormat 'FULL'
showExceptions true
showCauses true
showStackTraces true

// set options for log level DEBUG and INFO
debug {
events(
'STARTED',
'FAILED',
'PASSED',
'SKIPPED',
'STANDARD_ERROR',
'STANDARD_OUT'
)
exceptionFormat 'FULL'
}
info.events = debug.events
info.exceptionFormat = debug.exceptionFormat

afterSuite { desc, result ->
if (!desc.parent) { // will match the outermost suite
def output = "Results: " +
"${result.resultType} (${result.testCount} tests, " +
"${result.successfulTestCount} successes, " +
"${result.failedTestCount} failures, " +
"${result.skippedTestCount} skipped)"

def startItem = '| ', endItem = ' |'
def repeatLength = startItem.length() + output.length() + endItem.length()
println('\n' + ('-' * repeatLength) + '\n' + startItem + output + endItem + '\n' + ('-' * repeatLength))
}
}
}
}
}
10 changes: 5 additions & 5 deletions buildSrc/src/main/kotlin/Versions.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
object Versions {
const val protobuf = "3.7.1"
const val grpc = "1.21.0"
const val kotlin = "1.3.40"
const val coroutines = "1.2.2"
const val mockk = "1.9.1"
const val protobuf = "3.9.0"
const val grpc = "1.22.1"
const val kotlin = "1.3.41"
const val coroutines = "1.3.0-RC2"
const val mockk = "1.9.3"
}
11 changes: 11 additions & 0 deletions kroto-plus-coroutines/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ def experimentalFlags = [
"-Xuse-experimental=kotlin.Experimental",
"-Xuse-experimental=kotlinx.coroutines.ExperimentalCoroutinesApi",
"-Xuse-experimental=kotlinx.coroutines.ObsoleteCoroutinesApi",
"-Xuse-experimental=kotlinx.coroutines.FlowPreview",
"-Xuse-experimental=com.github.marcoferrer.krotoplus.coroutines.KrotoPlusInternalApi"
]

Expand Down Expand Up @@ -35,8 +36,18 @@ dependencies {
testImplementation project(':test-api:java')
testImplementation "io.mockk:mockk:${Versions.mockk}"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:${Versions.coroutines}"

// Not included by default due to the following issue
// https://github.com/Kotlin/kotlinx.coroutines/issues/1060
// testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-debug:${Versions.coroutines}"
}


//test {
// systemProperty "kotlinx.coroutines.scheduler.core.pool.size", "1"
// systemProperty "kotlinx.coroutines.scheduler.core.max.size", "1"
//}

tasks.withType(JavaCompile) {
enabled = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import io.grpc.stub.CallStreamObserver
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.ActorScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.actor
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger

Expand All @@ -42,40 +44,60 @@ internal fun <T> CallStreamObserver<*>.applyInboundFlowControl(
}
}

internal typealias MessageHandler = suspend ActorScope<*>.() -> Unit

internal fun <T> CoroutineScope.applyOutboundFlowControl(
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outbound flow control handler has been refactored. It no longer spawns multiple jobs when applying backpressure and can properly handle superfluous invocations of the on ready handler runnable.

streamObserver: CallStreamObserver<T>,
targetChannel: Channel<T>
){
val isOutboundJobRunning = AtomicBoolean()
): SendChannel<MessageHandler> {

val isCompleted = AtomicBoolean()
val channelIterator = targetChannel.iterator()
streamObserver.setOnReadyHandler {
if(targetChannel.isClosedForReceive){
streamObserver.completeSafely()
}else if(
val messageHandlerBlock: MessageHandler = handler@ {
while(
streamObserver.isReady &&
!targetChannel.isClosedForReceive &&
isOutboundJobRunning.compareAndSet(false, true)
channelIterator.hasNext()
){
launch(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e ->
streamObserver.completeSafely(e)
targetChannel.close(e)
}) {
try{
while(
streamObserver.isReady &&
!targetChannel.isClosedForReceive &&
channelIterator.hasNext()
){
val value = channelIterator.next()
streamObserver.onNext(value)
}
if(targetChannel.isClosedForReceive){
streamObserver.onCompleted()
}
} finally {
isOutboundJobRunning.set(false)
}
streamObserver.onNext(channelIterator.next())
}
if(targetChannel.isClosedForReceive && isCompleted.compareAndSet(false,true)){
streamObserver.onCompleted()
channel.close()
}
}

val messageHandlerActor = actor<MessageHandler>(
capacity = Channel.UNLIMITED,
Copy link

@blachris blachris Aug 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A limited capacity didn't work? I am wondering if it's possible that the channel becomes a memory leak if jobs are added faster than the worker consumes them.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a good catch. I must've changed it while debugging. I'll test it with the value reverted to CONFLATED.

This implementation is based off the native grpc util StreamObservers.copyWithFlowControl() which ensured each invocation of the onReadyHandler always ran. source

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using CONFLATED and have no issues.

context = Dispatchers.Unconfined + CoroutineExceptionHandler { _, e ->
streamObserver.completeSafely(e)
targetChannel.close(e)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is missing that the targetChannel is canceled in some situations. I tried canceling it here but this code was not being executed when I expected.
I have encountered that a client bidi call send hangs when the call is getting canceled in another thread because the channel is not canceled. This happened when the call is canceled while a send was pending in the call rendezvous outboundChannel. I think the rpc scope then somehow cleans up without canceling the channel, thus the send hangs.

}
) {

for (handler in channel) {
if (isCompleted.get()) break
handler(this)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this here reduced the problem but didn't eliminate it. I guess there must be other paths for the scope to cancel.

       catch(ex: CancellationException) {
            targetChannel.cancel(ex)
            throw ex
        }

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the scope could cancellation can also be propagated from its parent under normal normal coroutine usage. This case was covered before because executing new launch on a cancelled scope would take care. Its hard to reproduce but Im trying a few things now.

if(!isCompleted.get()) {
streamObserver.completeSafely()
}
}

targetChannel.invokeOnClose {
messageHandlerActor.close()
}

streamObserver.setOnReadyHandler {
try {
if(!messageHandlerActor.isClosedForSend){
messageHandlerActor.offer(messageHandlerBlock)
}
}catch (e: Throwable){
// If offer throws an exception then it is
// either already closed or there was a failure
// which has already cleaned up call resources
}
}

return messageHandlerActor
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package com.github.marcoferrer.krotoplus.coroutines.client

import com.github.marcoferrer.krotoplus.coroutines.call.FlowControlledInboundStreamObserver
import com.github.marcoferrer.krotoplus.coroutines.call.MessageHandler
import com.github.marcoferrer.krotoplus.coroutines.call.applyOutboundFlowControl
import io.grpc.stub.ClientCallStreamObserver
import io.grpc.stub.ClientResponseObserver
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
Expand Down Expand Up @@ -97,17 +99,35 @@ internal class ClientBidiCallChannelImpl<ReqT,RespT>(

override lateinit var callStreamObserver: ClientCallStreamObserver<ReqT>

private lateinit var outboundMessageHandler: SendChannel<MessageHandler>

override fun beforeStart(requestStream: ClientCallStreamObserver<ReqT>) {
callStreamObserver = requestStream.apply { disableAutoInboundFlowControl() }
applyOutboundFlowControl(requestStream,outboundChannel)
outboundMessageHandler = applyOutboundFlowControl(requestStream,outboundChannel)

inboundChannel.invokeOnClose {
// If the client prematurely closes the response channel
// we need to propagate this as a cancellation to the underlying call
if(!outboundChannel.isClosedForSend){
callStreamObserver.cancel("Call has been cancelled", it)
}
}
}

override fun onNext(value: RespT): Unit = onNextWithBackPressure(value)

override fun onError(t: Throwable) {
outboundChannel.close(t)
outboundChannel.cancel()
outboundChannel.cancel(CancellationException(t.message,t))
inboundChannel.close(t)
outboundMessageHandler.close(t)
}

override fun onCompleted() {
super.onCompleted()
if (isChannelReadyForClose) {
outboundMessageHandler.close()
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,24 @@ internal class ClientResponseStreamChannel<ReqT, RespT>(

override lateinit var callStreamObserver: ClientCallStreamObserver<ReqT>

private var aborted: Boolean = false

override fun beforeStart(requestStream: ClientCallStreamObserver<ReqT>) {
callStreamObserver = requestStream.apply {
applyInboundFlowControl(inboundChannel,transientInboundMessageCount)
}

inboundChannel.invokeOnClose {
if(!isInboundCompleted.get() && !aborted){
callStreamObserver.cancel("Call has been cancelled", it)
}
}
}

override fun onNext(value: RespT): Unit = onNextWithBackPressure(value)

override fun onError(t: Throwable) {
aborted = true
inboundChannel.close(t)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public fun <ReqT, RespT> ServiceScope.serverCallServerStreaming(
val serverCallObserver = responseObserver as ServerCallStreamObserver<RespT>
with(newRpcScope(initialContext, methodDescriptor)) {
bindToClientCancellation(serverCallObserver)
applyOutboundFlowControl(serverCallObserver,responseChannel)
val outboundMessageHandler = applyOutboundFlowControl(serverCallObserver,responseChannel)
launch(start = CoroutineStart.ATOMIC) {
try{
block(responseChannel)
Expand All @@ -75,6 +75,8 @@ public fun <ReqT, RespT> ServiceScope.serverCallServerStreaming(
val rpcError = e.toRpcException()
serverCallObserver.completeSafely(rpcError)
responseChannel.close(rpcError)
}finally {
outboundMessageHandler.close()
}
}

Expand Down Expand Up @@ -144,7 +146,7 @@ public fun <ReqT, RespT> ServiceScope.serverCallBidiStreaming(

with(newRpcScope(initialContext, methodDescriptor)) rpcScope@ {
bindToClientCancellation(serverCallObserver)
applyOutboundFlowControl(serverCallObserver,responseChannel)
val outboundMessageHandler = applyOutboundFlowControl(serverCallObserver,responseChannel)
val requestChannel = ServerRequestStreamChannel<ReqT>(
coroutineContext = coroutineContext,
callStreamObserver = serverCallObserver,
Expand Down Expand Up @@ -176,6 +178,7 @@ public fun <ReqT, RespT> ServiceScope.serverCallBidiStreaming(
if (!requestChannel.isClosedForReceive) {
requestChannel.cancel()
}
outboundMessageHandler.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals

@Suppress("DEPRECATION")
class CallOptionsTest {

@Test
Expand Down Expand Up @@ -131,4 +132,4 @@ class CallOptionsTest {
callOptions.getOption(CALL_OPTION_COROUTINE_CONTEXT)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class NewGrpcStubTests {
val stub = scope
.newGrpcStub(GreeterCoroutineGrpc.GreeterCoroutineStub, channel)

assertEquals(nameElement.name, stub.coroutineContext[CoroutineName]?.name)
assertEquals(nameElement.name, stub.context[CoroutineName]?.name)
}


Expand All @@ -50,7 +50,7 @@ class NewGrpcStubTests {
val stub = CoroutineScope(scopeNameElement + scopeJob)
.newGrpcStub(GreeterCoroutineGrpc.GreeterCoroutineStub, channel,expectedNameElement)

assertEquals(expectedNameElement.name, stub.coroutineContext[CoroutineName]?.name)
assertEquals(scopeJob, stub.coroutineContext[Job])
assertEquals(expectedNameElement.name, stub.context[CoroutineName]?.name)
assertEquals(scopeJob, stub.context[Job])
}
}
Loading