diff --git a/kotlin/src/main/kotlin/io/smallrye/mutiny/coroutines/Multi.kt b/kotlin/src/main/kotlin/io/smallrye/mutiny/coroutines/Multi.kt index 66742b01e..e34085885 100644 --- a/kotlin/src/main/kotlin/io/smallrye/mutiny/coroutines/Multi.kt +++ b/kotlin/src/main/kotlin/io/smallrye/mutiny/coroutines/Multi.kt @@ -3,18 +3,17 @@ package io.smallrye.mutiny.coroutines import io.smallrye.mutiny.Multi import io.smallrye.mutiny.subscription.MultiEmitter import io.smallrye.mutiny.subscription.MultiSubscriber +import java.util.concurrent.atomic.AtomicReference +import kotlin.coroutines.coroutineContext import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.channelFlow -import kotlinx.coroutines.flow.collect import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import org.reactivestreams.Subscription -import java.util.concurrent.atomic.AtomicReference -import kotlin.coroutines.coroutineContext /** * Subscribe to this [Multi] and provide the items as [Flow]. diff --git a/kotlin/src/main/kotlin/io/smallrye/mutiny/coroutines/Uni.kt b/kotlin/src/main/kotlin/io/smallrye/mutiny/coroutines/Uni.kt index c3a0bf535..8548fbce1 100644 --- a/kotlin/src/main/kotlin/io/smallrye/mutiny/coroutines/Uni.kt +++ b/kotlin/src/main/kotlin/io/smallrye/mutiny/coroutines/Uni.kt @@ -2,11 +2,11 @@ package io.smallrye.mutiny.coroutines import io.smallrye.mutiny.Uni import io.smallrye.mutiny.subscription.UniEmitter +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException import kotlinx.coroutines.Deferred import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.suspendCancellableCoroutine -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException /** * Awaits the result of this [Uni] while suspending the surrounding coroutine. diff --git a/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/DeferredAsUniTest.kt b/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/DeferredAsUniTest.kt index f9577a5d5..ca135e571 100644 --- a/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/DeferredAsUniTest.kt +++ b/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/DeferredAsUniTest.kt @@ -1,30 +1,28 @@ package io.smallrye.mutiny.coroutines import io.smallrye.mutiny.helpers.test.UniAssertSubscriber +import java.util.UUID +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import kotlin.test.Test import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.async import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import org.assertj.core.api.Assertions.assertThat -import java.util.UUID -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -import kotlin.test.Test @ExperimentalCoroutinesApi class DeferredAsUniTest { @Test fun `test immediate value`() { - runBlocking { + testBlocking { // Given val value = UUID.randomUUID() - val deferred = GlobalScope.async { value } + val deferred = embeddedScope().async { value } // When val subscriber = UniAssertSubscriber.create() @@ -37,9 +35,9 @@ class DeferredAsUniTest { @Test fun `test immediate failure`() { - runBlocking { - // Given - val deferred = GlobalScope.async { error("kaboom") } + testBlocking { + // Given an error produced by a Deferred created in an isolated CoroutineScope + val deferred = embeddedScope().async { error("kaboom") } // When val subscriber = UniAssertSubscriber() @@ -52,9 +50,9 @@ class DeferredAsUniTest { @Test fun `test coroutine cancellation before value is computed`() { - runBlocking { + testBlocking { // Given - val deferred = GlobalScope.async { + val deferred = embeddedScope().async { delay(250) UUID.randomUUID() } @@ -71,9 +69,9 @@ class DeferredAsUniTest { @Test fun `test null item`() { - runBlocking { + testBlocking { // Given - val deferred = GlobalScope.async { null } + val deferred = embeddedScope().async { null } // When val subscriber = UniAssertSubscriber.create() @@ -86,10 +84,10 @@ class DeferredAsUniTest { @Test fun `test uni cancellation before deferred completes`() { - runBlocking { + testBlocking { // Given val deferredLatch = CountDownLatch(1) - val deferred = GlobalScope.async { + val deferred = embeddedScope().async { delay(10_000) // simulate long running job "Hello, World!" } @@ -110,13 +108,13 @@ class DeferredAsUniTest { awaitLatch("Deferred", deferredLatch) assertThat(deferred.isCancelled) .`as`("Check that Deferred is cancelled") - .isTrue() + .isTrue } } @Test fun `test cancelling deferred does not impact siblings`() { - runBlocking { + testBlocking { // Start before async. val child1Latch = CountDownLatch(1) val child1 = launch { @@ -136,7 +134,7 @@ class DeferredAsUniTest { deferred.invokeOnCompletion { exception -> deferredLatch.countDown() assertThat(exception) - .isNotNull() + .isNotNull .isInstanceOf(CancellationException::class.java) } @@ -162,7 +160,7 @@ class DeferredAsUniTest { awaitLatch("Child 1", child1Latch) assertThat(child1.isCompleted) .`as`("Check that child 1 completed") - .isTrue() + .isTrue awaitLatch("Uni", uniLatch) awaitLatch("Deferred", deferredLatch) @@ -171,11 +169,11 @@ class DeferredAsUniTest { .isNotNull assertThat(deferred.isCancelled) .`as`("Check that Deferred was cancelled") - .isTrue() + .isTrue awaitLatch("Sibling 2", child2Latch) assertThat(child2.isCompleted) .`as`("Check that child 2 completed") - .isTrue() + .isTrue } } } @@ -184,5 +182,5 @@ class DeferredAsUniTest { private fun awaitLatch(name: String, latch: CountDownLatch) { assertThat(latch.await(2, TimeUnit.SECONDS)) .`as`("Check that $name completes within 2s") - .isTrue() + .isTrue } diff --git a/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/FlowAsMultiTest.kt b/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/FlowAsMultiTest.kt index c919e9a75..f6f1024cc 100644 --- a/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/FlowAsMultiTest.kt +++ b/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/FlowAsMultiTest.kt @@ -1,22 +1,24 @@ package io.smallrye.mutiny.coroutines import io.smallrye.mutiny.helpers.test.AssertSubscriber -import kotlinx.coroutines.* +import java.util.UUID +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.Test +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.launch import org.assertj.core.api.Assertions.assertThat -import java.util.* -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicInteger -import kotlin.test.Test class FlowAsMultiTest { @Test fun `test immediate item`() { - runBlocking(Dispatchers.Default) { + testBlocking { // Given val item = UUID.randomUUID() val flow = flowOf(item) @@ -35,7 +37,7 @@ class FlowAsMultiTest { @Test fun `test immediate items`() { - runBlocking(Dispatchers.Default) { + testBlocking { // Given val items = arrayOf(5, 23, 42) val flow = flowOf(*items) @@ -54,7 +56,7 @@ class FlowAsMultiTest { @Test fun `test immediate failure`() { - runBlocking(Dispatchers.Default) { + testBlocking { // Given val flow = flow { error("boom") @@ -66,13 +68,13 @@ class FlowAsMultiTest { // Then subscriber.awaitFailure() - .assertFailedWith(IllegalStateException::class.java, "boom") + .assertFailedWith(IllegalStateException::class.java, "boom") } } @Test fun `verify that coroutine cancellation result in failure`() { - runBlocking(Dispatchers.Default) { + testBlocking { // Given val flow = flow { delay(200) @@ -81,13 +83,11 @@ class FlowAsMultiTest { val subscriber = AssertSubscriber.create(1) // When - runBlocking { - val job = launch { - flow.asMulti().subscribe().withSubscriber(subscriber) - } - delay(50) - job.cancel(CancellationException("abort")) + val job = launch { + flow.asMulti().subscribe().withSubscriber(subscriber) } + delay(50) + job.cancel(CancellationException("abort")) Thread.sleep(350) // Then @@ -97,7 +97,7 @@ class FlowAsMultiTest { @Test fun `verify that Flow cancels on Multi subscription cancellation`() { - runBlocking(Dispatchers.IO) { + testBlocking { // Given val counter = AtomicInteger() var exitException: Throwable? = null @@ -107,7 +107,7 @@ class FlowAsMultiTest { emit(counter.incrementAndGet()) } } catch (err: Throwable) { - exitException = err + exitException = err } } @@ -122,15 +122,14 @@ class FlowAsMultiTest { assertThat(counter.get()).isGreaterThan(0) subscriber.assertItems(*(1..42).toList().toTypedArray()) subscriber.assertNotTerminated() - assertThat(subscriber.isCancelled).isTrue() - assertThat(exitException).isNotNull().isInstanceOf(CancellationException::class.java) + assertThat(subscriber.isCancelled).isTrue + assertThat(exitException).isNotNull.isInstanceOf(CancellationException::class.java) } } - @ExperimentalCoroutinesApi @Test fun `verify that callbackFlow cancels on Multi subscription cancellation`() { - runBlocking(Dispatchers.IO) { + testBlocking { // Given val closed = AtomicBoolean(false) val flow = callbackFlow { @@ -154,7 +153,7 @@ class FlowAsMultiTest { // Then subscriber.assertNotTerminated() - assertThat(subscriber.isCancelled).isTrue() + assertThat(subscriber.isCancelled).isTrue assertThat(closed).isTrue } } diff --git a/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/MultiAsFlowTest.kt b/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/MultiAsFlowTest.kt index 9093eb5a9..135b20f8f 100644 --- a/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/MultiAsFlowTest.kt +++ b/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/MultiAsFlowTest.kt @@ -2,32 +2,30 @@ package io.smallrye.mutiny.coroutines import io.smallrye.mutiny.Multi import io.smallrye.mutiny.subscription.MultiEmitter +import java.time.Duration +import java.time.Instant +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference +import kotlin.system.measureTimeMillis +import kotlin.test.Test +import kotlin.test.assertFails +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue +import kotlin.test.fail import kotlinx.coroutines.CancellationException import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.async import kotlinx.coroutines.delay import kotlinx.coroutines.flow.buffer -import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withTimeout import org.assertj.core.api.Assertions.assertThat -import java.time.Duration -import java.time.Instant -import java.util.concurrent.Executors -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicReference -import kotlin.system.measureTimeMillis -import kotlin.test.Test -import kotlin.test.assertFails -import kotlin.test.assertFailsWith -import kotlin.test.assertTrue -import kotlin.test.fail @ExperimentalCoroutinesApi class MultiAsFlowTest { @@ -67,7 +65,7 @@ class MultiAsFlowTest { val items = arrayOf(5, 23, 42) val delayMillis = 50L val multi = Multi.createFrom().emitter { em: MultiEmitter -> - GlobalScope.launch { + embeddedScope().launch { items.forEach { delay(delayMillis) em.emit(it) @@ -91,7 +89,7 @@ class MultiAsFlowTest { // Given val delayMillis = 100L val multi = Multi.createFrom().emitter { em: MultiEmitter -> - GlobalScope.launch { + embeddedScope().launch { delay(delayMillis) em.fail(Exception("boom")) } @@ -144,7 +142,7 @@ class MultiAsFlowTest { flow.collect { eventItems.add(it) } } } - GlobalScope.launch { + embeddedScope().launch { // yield thread and wait shortly for ensuring emitter got set delay(100) emit() @@ -194,7 +192,7 @@ class MultiAsFlowTest { // When var retrieveError: Throwable? = null - runBlocking { + testBlocking { val job = launch { try { multi.asFlow().collect { @@ -211,8 +209,8 @@ class MultiAsFlowTest { Thread.sleep(100) // Then - assertThat(counter.get()).isGreaterThan(0) assertThat(retrieveError).isInstanceOf(CancellationException::class.java) + assertThat(counter.get()).isGreaterThan(0) } @Test @@ -222,7 +220,7 @@ class MultiAsFlowTest { // When & Then assertFailsWith { - runBlocking { + testBlocking { withTimeout(100) { multi.asFlow().toList() } @@ -232,7 +230,7 @@ class MultiAsFlowTest { @Test fun `test empty Multi`() { - runBlocking { + testBlocking { // Given val multi = Multi.createFrom().empty() @@ -246,7 +244,7 @@ class MultiAsFlowTest { @Test fun `test null item`() { - runBlocking { + testBlocking { // Given val multi = Multi.createFrom().item { null } @@ -260,7 +258,7 @@ class MultiAsFlowTest { @Test fun `test buffered flow`() { - runBlocking { + testBlocking { // Given val items = (1..10_000).toList() val multi = Multi.createFrom().iterable(items) @@ -282,7 +280,7 @@ class MultiAsFlowTest { // When val ticks = AtomicInteger() assertFailsWith { - runBlocking { + testBlocking { withTimeout(300) { multi.asFlow().buffer(5).collect { ticks.incrementAndGet() @@ -302,7 +300,7 @@ class MultiAsFlowTest { val multi = Multi.createFrom().ticks().every(Duration.ofMillis(1)) // When - val tick = runBlocking { + val tick = testBlocking { multi.asFlow().first() } @@ -324,7 +322,7 @@ class MultiAsFlowTest { // When val start = System.currentTimeMillis() - val tick = runBlocking { + val tick = testBlocking { multi.asFlow().first() } @@ -339,7 +337,7 @@ class MultiAsFlowTest { val multi = Multi.createFrom().items(3, 2, 1) // When - val list = runBlocking(Executors.newSingleThreadExecutor().asCoroutineDispatcher()) { + val list = testBlocking(Executors.newSingleThreadExecutor().asCoroutineDispatcher()) { multi.asFlow().toList() } diff --git a/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/TestUtil.kt b/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/TestUtil.kt new file mode 100644 index 000000000..7867d4a0c --- /dev/null +++ b/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/TestUtil.kt @@ -0,0 +1,19 @@ +package io.smallrye.mutiny.coroutines + +import kotlin.coroutines.CoroutineContext +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking + +/** + * Utility method for typing gentle coroutine testing. + * + * Wrap the given [block] in coroutines [runBlocking] using the [Dispatchers.Default] as default [context]. + */ +fun testBlocking(context: CoroutineContext = Dispatchers.Default, block: suspend CoroutineScope.() -> T): T = + runBlocking(context, block) + +/** + * Produces a new embedded [CoroutineScope] to be used for isolating coroutines inside a coroutine test + */ +fun embeddedScope() = CoroutineScope(Dispatchers.Default) \ No newline at end of file diff --git a/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/UniAwaitSuspendingTest.kt b/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/UniAwaitSuspendingTest.kt index a8fb767b6..8acfe4489 100644 --- a/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/UniAwaitSuspendingTest.kt +++ b/kotlin/src/test/kotlin/io/smallrye/mutiny/coroutines/UniAwaitSuspendingTest.kt @@ -1,14 +1,6 @@ package io.smallrye.mutiny.coroutines import io.smallrye.mutiny.Uni -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.TimeoutCancellationException -import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withTimeout -import org.assertj.core.api.Assertions.assertThat import java.time.Duration import java.time.Instant import java.util.UUID @@ -19,12 +11,19 @@ import kotlin.test.assertFailsWith import kotlin.test.assertNull import kotlin.test.assertTrue import kotlin.test.fail +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.withTimeout +import org.assertj.core.api.Assertions.assertThat class UniAwaitSuspendingTest { @Test fun `test immediate onItem event`() { - runBlocking { + testBlocking { // Given val item = UUID.randomUUID() val uni = Uni.createFrom().item(item) @@ -39,7 +38,7 @@ class UniAwaitSuspendingTest { @Test fun `test delayed onItem event`() { - runBlocking { + testBlocking { // Given val item = UUID.randomUUID() val delayed = Duration.ofMillis(250) @@ -58,7 +57,7 @@ class UniAwaitSuspendingTest { @Test fun `test null item`() { - runBlocking { + testBlocking { // Given val uni = Uni.createFrom().nullItem() @@ -76,7 +75,7 @@ class UniAwaitSuspendingTest { val uni = Uni.createFrom().failure(RuntimeException("boom")) // When & Then - assertFailsWith("boom") { runBlocking { uni.awaitSuspending() } } + assertFailsWith("boom") { testBlocking { uni.awaitSuspending() } } } @Test @@ -85,7 +84,7 @@ class UniAwaitSuspendingTest { val uni = Uni.createFrom().failure(Exception("kaboom")) // When & Then - assertFailsWith("kaboom") { runBlocking { uni.awaitSuspending() } } + assertFailsWith("kaboom") { testBlocking { uni.awaitSuspending() } } } @Test @@ -95,7 +94,7 @@ class UniAwaitSuspendingTest { // When var retrieveError: Throwable? = null - runBlocking { + testBlocking { val job = launch { try { uni.awaitSuspending() @@ -120,7 +119,7 @@ class UniAwaitSuspendingTest { // When & Then assertFailsWith { - runBlocking { + testBlocking { withTimeout(50) { uni.awaitSuspending() fail() @@ -135,7 +134,7 @@ class UniAwaitSuspendingTest { val uni = Uni.createFrom().item(23) // When - val item = runBlocking(Executors.newSingleThreadExecutor().asCoroutineDispatcher()) { + val item = testBlocking(Executors.newSingleThreadExecutor().asCoroutineDispatcher()) { uni.awaitSuspending() } diff --git a/pom.xml b/pom.xml index 0164b2c58..af54861f9 100644 --- a/pom.xml +++ b/pom.xml @@ -105,9 +105,9 @@ 1.6.10 - 1.4 - 1.4 - 1.5.2 + 1.5 + 1.5 + 1.6.0 ${maven.compiler.target} 1.6.10