Skip to content

Commit

Permalink
refactor atomic server tests
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoferrer committed May 20, 2019
1 parent e376fe0 commit ab3ee29
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 56 deletions.
9 changes: 7 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
language: java
jdk:
- oraclejdk8
dist: trusty
sudo: false
addons:
apt:
Expand All @@ -16,7 +17,11 @@ before_cache:
- rm -rf $HOME/.gradle/caches/*/fileHashes/
jobs:
include:
- stage: "Tests"
script: ./gradlew build && cd example-project && ./gradlew test
- name: Gradle Check
install:
- ./gradlew assemble
script:
- ./gradlew check
- cd example-project && ./gradlew test
after_success:
- bash <(curl -s https://codecov.io/bash)
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import io.mockk.coVerify
import io.mockk.spyk
import io.mockk.verify
import io.mockk.verifyOrder
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
Expand All @@ -46,9 +47,9 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.yield
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.test.assertEquals
Expand Down Expand Up @@ -384,23 +385,29 @@ class ServerCallBidiStreamingTests {

@Test
fun `Server method is at least invoked before being cancelled`(){
val respChannel = AtomicReference<SendChannel<HelloReply>?>()
val serverCtx = AtomicReference<CoroutineContext?>()
val deferredRespChannel = CompletableDeferred<SendChannel<HelloReply>>()
val deferredCtx = CompletableDeferred<CoroutineContext>()

grpcServerRule.serviceRegistry.addService(object : GreeterCoroutineGrpc.GreeterImplBase() {
override val initialContext: CoroutineContext = Dispatchers.Unconfined
override val initialContext: CoroutineContext = Dispatchers.Default
override suspend fun sayHelloStreaming(
requestChannel: ReceiveChannel<HelloRequest>,
responseChannel: SendChannel<HelloReply>
) {
respChannel.set(spyk(responseChannel))
serverCtx.set(coroutineContext)
val respChan = spyk(responseChannel)
deferredCtx.complete(coroutineContext.apply {
get(Job)!!.invokeOnCompletion {
deferredRespChannel.complete(respChan)
}
})
// Need to receive message since
// cancellation occurs in client
// half close.
requestChannel.receive()
delay(10)
delay(100)
yield()
repeat(3){
respChannel.get()!!.send { message = "response" }
respChan.send { message = "response" }
}
}
})
Expand All @@ -413,12 +420,14 @@ class ServerCallBidiStreamingTests {
reqObserver.onCompleted()

runBlocking {
do { delay(100) } while(serverCtx.get() == null)
assert(serverCtx.get()?.get(Job)!!.isCompleted){ "Server job should be completed" }
assert(serverCtx.get()?.get(Job)!!.isCancelled){ "Server job should be cancelled" }
assert(respChannel.get()!!.isClosedForSend){ "Abandoned response channel should be closed" }
verify(exactly = 1) { responseObserver.onError(matchStatus(Status.CANCELLED, "CANCELLED")) }
coVerify(exactly = 0) { respChannel.get()!!.send(any()) }
val respChannel = deferredRespChannel.await()
assert(respChannel.isClosedForSend){ "Abandoned response channel should be closed" }
verify(exactly = 1) { responseObserver.onError(matchStatus(Status.CANCELLED, "CANCELLED: test")) }
coVerify(exactly = 0) { respChannel.send(any()) }

val serverCtx = deferredCtx.await()
assert(serverCtx[Job]!!.isCompleted){ "Server job should be completed" }
assert(serverCtx[Job]!!.isCancelled){ "Server job should be cancelled" }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import io.grpc.stub.StreamObserver
import io.grpc.testing.GrpcServerRule
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
Expand All @@ -42,10 +43,10 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.yield
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.test.assertEquals
Expand Down Expand Up @@ -254,20 +255,22 @@ class ServerCallClientStreamingTests {

@Test
fun `Server method is at least invoked before being cancelled`(){
val serverMethodExecuted = AtomicBoolean()
val serverMethodCompleted = AtomicBoolean()
val serverCtx = AtomicReference<CoroutineContext?>()

val deferredCtx = CompletableDeferred<CoroutineContext>()
grpcServerRule.serviceRegistry.addService(object : GreeterCoroutineGrpc.GreeterImplBase() {
override val initialContext: CoroutineContext = Dispatchers.Unconfined
override val initialContext: CoroutineContext = Dispatchers.Default
override suspend fun sayHelloClientStreaming(requestChannel: ReceiveChannel<HelloRequest>): HelloReply {
serverMethodExecuted.set(true)
serverCtx.set(coroutineContext)
coroutineContext.let { ctx ->
ctx[Job]!!.invokeOnCompletion {
deferredCtx.complete(ctx)
}
}
// Need to receive message since
// cancellation occurs in client
// half close.
requestChannel.receive()
delay(10)
delay(100)
yield()
serverMethodCompleted.set(true)
return expectedResponse
}
Expand All @@ -282,12 +285,11 @@ class ServerCallClientStreamingTests {
reqObserver.onCompleted()

runBlocking {
do { delay(100) } while(serverCtx.get() == null)

verify(exactly = 1) { responseObserver.onError(matchStatus(Status.CANCELLED, "CANCELLED")) }
val serverCtx = deferredCtx.await()
verify(exactly = 1) { responseObserver.onError(matchStatus(Status.CANCELLED, "CANCELLED: test")) }

assert(serverCtx.get()?.get(Job)!!.isCompleted){ "Server job should be completed" }
assert(serverCtx.get()?.get(Job)!!.isCancelled){ "Server job should be cancelled" }
assert(serverCtx[Job]!!.isCompleted){ "Server job should be completed" }
assert(serverCtx[Job]!!.isCancelled){ "Server job should be cancelled" }
assertFalse(serverMethodCompleted.get(),"Server method should not complete")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import io.mockk.coVerify
import io.mockk.spyk
import io.mockk.verify
import io.mockk.verifyOrder
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
Expand All @@ -48,7 +49,6 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.yield
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.test.assertEquals
Expand Down Expand Up @@ -221,38 +221,44 @@ class ServerCallServerStreamingTests {

@Test
fun `Server method is at least invoked before being cancelled`(){
val respChannel = AtomicReference<SendChannel<HelloReply>?>()
val serverCtx = AtomicReference<CoroutineContext?>()

val deferredRespChannel = CompletableDeferred<SendChannel<HelloReply>>()
val deferredCtx = CompletableDeferred<CoroutineContext>()
grpcServerRule.serviceRegistry.addService(object : GreeterCoroutineGrpc.GreeterImplBase() {
override val initialContext: CoroutineContext = Dispatchers.Unconfined
override val initialContext: CoroutineContext = Dispatchers.Default
override suspend fun sayHelloServerStreaming(
request: HelloRequest,
responseChannel: SendChannel<HelloReply>
) {
respChannel.set(spyk(responseChannel))
serverCtx.set(coroutineContext)
delay(5)
val respChan = spyk(responseChannel)
deferredCtx.complete(coroutineContext.apply {
get(Job)!!.invokeOnCompletion {
deferredRespChannel.complete(respChan)
}
})
delay(100)
yield()
repeat(3){
respChannel.get()!!.send { message = "response" }
respChan.send { message = "response" }
}
}
})

val stub = GreeterGrpc.newBlockingStub(grpcServerRule.channel)
.withInterceptors(CancellingClientInterceptor)

assertFailsWithStatus(Status.CANCELLED,"CANCELLED"){
assertFailsWithStatus(Status.CANCELLED,"CANCELLED: test"){
val iter = stub.sayHelloServerStreaming(HelloRequest.getDefaultInstance())
while(iter.hasNext()){}
}

runBlocking {
do { delay(100) } while(serverCtx.get() == null)
assert(serverCtx.get()?.get(Job)!!.isCompleted){ "Server job should be completed" }
assert(serverCtx.get()?.get(Job)!!.isCancelled){ "Server job should be cancelled" }
assert(respChannel.get()!!.isClosedForSend){ "Abandoned response channel should be closed" }
coVerify(exactly = 0) { respChannel.get()!!.send(any()) }
val respChannel = deferredRespChannel.await()
assert(respChannel.isClosedForSend){ "Abandoned response channel should be closed" }
coVerify(exactly = 0) { respChannel.send(any()) }

val serverCtx = deferredCtx.await()
assert(serverCtx[Job]!!.isCompleted){ "Server job should be completed" }
assert(serverCtx[Job]!!.isCancelled){ "Server job should be cancelled" }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@ import io.grpc.stub.StreamObserver
import io.grpc.testing.GrpcServerRule
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.yield
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.test.assertEquals
Expand Down Expand Up @@ -159,16 +160,18 @@ class ServerCallUnaryTests {

@Test
fun `Server method is at least invoked before being cancelled`(){
val serverMethodExecuted = AtomicBoolean()
val serverMethodCompleted = AtomicBoolean()
val serverCtx = AtomicReference<CoroutineContext?>()

val deferredCtx = CompletableDeferred<CoroutineContext>()
grpcServerRule.serviceRegistry.addService(object : GreeterCoroutineGrpc.GreeterImplBase() {
override val initialContext: CoroutineContext = Dispatchers.Unconfined
override val initialContext: CoroutineContext = Dispatchers.Default
override suspend fun sayHello(request: HelloRequest): HelloReply {
serverMethodExecuted.set(true)
serverCtx.set(coroutineContext)
delay(10)
coroutineContext.let { ctx ->
ctx[Job]!!.invokeOnCompletion {
deferredCtx.complete(ctx)
}
}
delay(100)
yield()
serverMethodCompleted.set(true)
return expectedResponse
}
Expand All @@ -178,14 +181,14 @@ class ServerCallUnaryTests {
.newBlockingStub(grpcServerRule.channel)
.withInterceptors(CancellingClientInterceptor)

assertFailsWithStatus(Status.CANCELLED,"CANCELLED"){
assertFailsWithStatus(Status.CANCELLED,"CANCELLED: test"){
stub.sayHello(HelloRequest.getDefaultInstance())
}

runBlocking {
do { delay(100) } while(serverCtx.get() == null)
assert(serverCtx.get()?.get(Job)!!.isCompleted){ "Server job should be completed" }
assert(serverCtx.get()?.get(Job)!!.isCancelled){ "Server job should be cancelled" }
val serverCtx = deferredCtx.await()
assert(serverCtx[Job]!!.isCompleted){ "Server job should be completed" }
assert(serverCtx[Job]!!.isCancelled){ "Server job should be cancelled" }
assertFalse(serverMethodCompleted.get(),"Server method should not complete")
}
}
Expand Down
1 change: 1 addition & 0 deletions protoc-gen-kroto-plus/generator-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ apply plugin: 'com.google.protobuf'

compileTestKotlin {
kotlinOptions {
jvmTarget = "1.8"
freeCompilerArgs += [
"-Xuse-experimental=kotlin.Experimental"
]
Expand Down

0 comments on commit ab3ee29

Please sign in to comment.