Skip to content

Commit

Permalink
Merge pull request #790 from smallrye/feature/UpdateCoroutinesTo1.6
Browse files Browse the repository at this point in the history
Update Coroutines to 1.6.0, Kotlin API to 1.5 and refactored tests
  • Loading branch information
jponge authored Jan 3, 2022
2 parents a614bee + 6decd09 commit dea5025
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 97 deletions.
5 changes: 2 additions & 3 deletions kotlin/src/main/kotlin/io/smallrye/mutiny/coroutines/Multi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@ package io.smallrye.mutiny.coroutines
import io.smallrye.mutiny.Multi
import io.smallrye.mutiny.subscription.MultiEmitter
import io.smallrye.mutiny.subscription.MultiSubscriber
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import org.reactivestreams.Subscription
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.coroutineContext

/**
* Subscribe to this [Multi] and provide the items as [Flow].
Expand Down
4 changes: 2 additions & 2 deletions kotlin/src/main/kotlin/io/smallrye/mutiny/coroutines/Uni.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package io.smallrye.mutiny.coroutines

import io.smallrye.mutiny.Uni
import io.smallrye.mutiny.subscription.UniEmitter
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

/**
* Awaits the result of this [Uni] while suspending the surrounding coroutine.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
package io.smallrye.mutiny.coroutines

import io.smallrye.mutiny.helpers.test.UniAssertSubscriber
import java.util.UUID
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlin.test.Test
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.assertj.core.api.Assertions.assertThat
import java.util.UUID
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlin.test.Test

@ExperimentalCoroutinesApi
class DeferredAsUniTest {

@Test
fun `test immediate value`() {
runBlocking {
testBlocking {
// Given
val value = UUID.randomUUID()
val deferred = GlobalScope.async { value }
val deferred = embeddedScope().async { value }

// When
val subscriber = UniAssertSubscriber.create<UUID>()
Expand All @@ -37,9 +35,9 @@ class DeferredAsUniTest {

@Test
fun `test immediate failure`() {
runBlocking {
// Given
val deferred = GlobalScope.async { error("kaboom") }
testBlocking {
// Given an error produced by a Deferred created in an isolated CoroutineScope
val deferred = embeddedScope().async { error("kaboom") }

// When
val subscriber = UniAssertSubscriber<Any>()
Expand All @@ -52,9 +50,9 @@ class DeferredAsUniTest {

@Test
fun `test coroutine cancellation before value is computed`() {
runBlocking {
testBlocking {
// Given
val deferred = GlobalScope.async {
val deferred = embeddedScope().async {
delay(250)
UUID.randomUUID()
}
Expand All @@ -71,9 +69,9 @@ class DeferredAsUniTest {

@Test
fun `test null item`() {
runBlocking {
testBlocking {
// Given
val deferred = GlobalScope.async { null }
val deferred = embeddedScope().async { null }

// When
val subscriber = UniAssertSubscriber.create<Any>()
Expand All @@ -86,10 +84,10 @@ class DeferredAsUniTest {

@Test
fun `test uni cancellation before deferred completes`() {
runBlocking {
testBlocking {
// Given
val deferredLatch = CountDownLatch(1)
val deferred = GlobalScope.async {
val deferred = embeddedScope().async {
delay(10_000) // simulate long running job
"Hello, World!"
}
Expand All @@ -110,13 +108,13 @@ class DeferredAsUniTest {
awaitLatch("Deferred", deferredLatch)
assertThat(deferred.isCancelled)
.`as`("Check that Deferred is cancelled")
.isTrue()
.isTrue
}
}

@Test
fun `test cancelling deferred does not impact siblings`() {
runBlocking {
testBlocking {
// Start before async.
val child1Latch = CountDownLatch(1)
val child1 = launch {
Expand All @@ -136,7 +134,7 @@ class DeferredAsUniTest {
deferred.invokeOnCompletion { exception ->
deferredLatch.countDown()
assertThat(exception)
.isNotNull()
.isNotNull
.isInstanceOf(CancellationException::class.java)
}

Expand All @@ -162,7 +160,7 @@ class DeferredAsUniTest {
awaitLatch("Child 1", child1Latch)
assertThat(child1.isCompleted)
.`as`("Check that child 1 completed")
.isTrue()
.isTrue

awaitLatch("Uni", uniLatch)
awaitLatch("Deferred", deferredLatch)
Expand All @@ -171,11 +169,11 @@ class DeferredAsUniTest {
.isNotNull
assertThat(deferred.isCancelled)
.`as`("Check that Deferred was cancelled")
.isTrue()
.isTrue
awaitLatch("Sibling 2", child2Latch)
assertThat(child2.isCompleted)
.`as`("Check that child 2 completed")
.isTrue()
.isTrue
}
}
}
Expand All @@ -184,5 +182,5 @@ class DeferredAsUniTest {
private fun awaitLatch(name: String, latch: CountDownLatch) {
assertThat(latch.await(2, TimeUnit.SECONDS))
.`as`("Check that $name completes within 2s")
.isTrue()
.isTrue
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package io.smallrye.mutiny.coroutines

import io.smallrye.mutiny.helpers.test.AssertSubscriber
import kotlinx.coroutines.*
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.Test
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.launch
import org.assertj.core.api.Assertions.assertThat
import java.util.*
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.Test

class FlowAsMultiTest {

@Test
fun `test immediate item`() {
runBlocking(Dispatchers.Default) {
testBlocking {
// Given
val item = UUID.randomUUID()
val flow = flowOf(item)
Expand All @@ -35,7 +37,7 @@ class FlowAsMultiTest {

@Test
fun `test immediate items`() {
runBlocking(Dispatchers.Default) {
testBlocking {
// Given
val items = arrayOf(5, 23, 42)
val flow = flowOf(*items)
Expand All @@ -54,7 +56,7 @@ class FlowAsMultiTest {

@Test
fun `test immediate failure`() {
runBlocking(Dispatchers.Default) {
testBlocking {
// Given
val flow = flow<Any> {
error("boom")
Expand All @@ -66,13 +68,13 @@ class FlowAsMultiTest {

// Then
subscriber.awaitFailure()
.assertFailedWith(IllegalStateException::class.java, "boom")
.assertFailedWith(IllegalStateException::class.java, "boom")
}
}

@Test
fun `verify that coroutine cancellation result in failure`() {
runBlocking(Dispatchers.Default) {
testBlocking {
// Given
val flow = flow<UUID> {
delay(200)
Expand All @@ -81,13 +83,11 @@ class FlowAsMultiTest {
val subscriber = AssertSubscriber.create<UUID>(1)

// When
runBlocking {
val job = launch {
flow.asMulti().subscribe().withSubscriber(subscriber)
}
delay(50)
job.cancel(CancellationException("abort"))
val job = launch {
flow.asMulti().subscribe().withSubscriber(subscriber)
}
delay(50)
job.cancel(CancellationException("abort"))
Thread.sleep(350)

// Then
Expand All @@ -97,7 +97,7 @@ class FlowAsMultiTest {

@Test
fun `verify that Flow cancels on Multi subscription cancellation`() {
runBlocking(Dispatchers.IO) {
testBlocking {
// Given
val counter = AtomicInteger()
var exitException: Throwable? = null
Expand All @@ -107,7 +107,7 @@ class FlowAsMultiTest {
emit(counter.incrementAndGet())
}
} catch (err: Throwable) {
exitException = err
exitException = err
}
}

Expand All @@ -122,15 +122,14 @@ class FlowAsMultiTest {
assertThat(counter.get()).isGreaterThan(0)
subscriber.assertItems(*(1..42).toList().toTypedArray())
subscriber.assertNotTerminated()
assertThat(subscriber.isCancelled).isTrue()
assertThat(exitException).isNotNull().isInstanceOf(CancellationException::class.java)
assertThat(subscriber.isCancelled).isTrue
assertThat(exitException).isNotNull.isInstanceOf(CancellationException::class.java)
}
}

@ExperimentalCoroutinesApi
@Test
fun `verify that callbackFlow cancels on Multi subscription cancellation`() {
runBlocking(Dispatchers.IO) {
testBlocking {
// Given
val closed = AtomicBoolean(false)
val flow = callbackFlow<Int> {
Expand All @@ -154,7 +153,7 @@ class FlowAsMultiTest {

// Then
subscriber.assertNotTerminated()
assertThat(subscriber.isCancelled).isTrue()
assertThat(subscriber.isCancelled).isTrue
assertThat(closed).isTrue
}
}
Expand Down
Loading

0 comments on commit dea5025

Please sign in to comment.