Skip to content

Commit

Permalink
Introduce turbine { ... } API (#237)
Browse files Browse the repository at this point in the history
* Add failing test for lost exceptions in testIn without test

* Extract code for turbine { .. } fn
  • Loading branch information
jingibus authored Jun 16, 2023
1 parent 61b1872 commit 52970ab
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 40 deletions.
107 changes: 67 additions & 40 deletions src/commonMain/kotlin/app/cash/turbine/flow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,25 @@ import kotlinx.coroutines.plus
import kotlinx.coroutines.test.TestCoroutineScheduler
import kotlinx.coroutines.test.UnconfinedTestDispatcher

public interface TurbineTestContext<T> : ReceiveTurbine<T> {
public interface TurbineContext : CoroutineScope {
public fun <R> Flow<R>.testIn(
scope: CoroutineScope,
timeout: Duration? = null,
name: String? = null,
): ReceiveTurbine<R>
}
public interface TurbineTestContext<T> : TurbineContext, ReceiveTurbine<T>

internal class TurbineTestContextImpl<T>(
turbine: Turbine<T>,
turbineContext: CoroutineContext,
) : TurbineTestContext<T>, ReceiveTurbine<T> by turbine {
) : TurbineContext by TurbineContextImpl(turbineContext), ReceiveTurbine<T> by turbine, TurbineTestContext<T>

internal class TurbineContextImpl(
turbineContext: CoroutineContext,
) : TurbineContext, CoroutineScope {
override val coroutineContext: CoroutineContext = turbineContext

private val turbineElements = (turbineContext[TurbineRegistryElement] ?: EmptyCoroutineContext) +
(turbineContext[TurbineTimeoutElement] ?: EmptyCoroutineContext)
override fun <R> Flow<R>.testIn(
Expand All @@ -61,6 +68,54 @@ internal class TurbineTestContextImpl<T>(
)
}

/**
* Run a validation block that catches and reports all unhandled exceptions in flows run by Turbine.
*/
public suspend fun turbine(
timeout: Duration? = null,
validate: suspend TurbineContext.() -> Unit,
) {
val turbineRegistry = mutableListOf<ChannelTurbine<*>>()
reportTurbines(turbineRegistry) {
val scopeFn: suspend (suspend CoroutineScope.() -> Unit) -> Unit = { block ->
if (timeout == null) {
coroutineScope(block)
} else {
withTurbineTimeout(timeout, block)
}
}
scopeFn {
try {
val testContext = TurbineContextImpl(currentCoroutineContext())
testContext.validate()
} catch (e: Throwable) {
// The exception needs to be reraised. However, if there are any unconsumed events
// from other turbines (including this one), those may indicate an underlying problem.
// So: create a report with all the registered turbines, and include exception as cause
val reportsWithExceptions = turbineRegistry.map {
it.reportUnconsumedEvents()
// The exception will have cancelled its job hierarchy, producing cancellation exceptions
// in its wake. These aren't meaningful test feedback
.stripCancellations()
}
.filter { it.cause != null }
if (reportsWithExceptions.isEmpty()) {
throw e
} else {
throw TurbineAssertionError(
buildString {
reportsWithExceptions.forEach {
it.describeException(this@buildString)
}
},
e,
)
}
}
}
}
}

/**
* Terminal flow operator that collects events from given flow and allows the [validate] lambda to
* consume and assert properties on them in order. If any exception occurs during validation the
Expand All @@ -82,46 +137,18 @@ public suspend fun <T> Flow<T>.test(
name: String? = null,
validate: suspend TurbineTestContext<T>.() -> Unit,
) {
val turbineRegistry = mutableListOf<ChannelTurbine<*>>()
reportTurbines(turbineRegistry) {
coroutineScope {
collectTurbineIn(this, null, name).apply {
try {
val testContext = TurbineTestContextImpl(this@apply, currentCoroutineContext())
if (timeout != null) {
withTurbineTimeout(timeout) {
testContext.validate()
}
} else {
testContext.validate()
}
cancel()
ensureAllEventsConsumed()
} catch (e: Throwable) {
// The exception needs to be reraised. However, if there are any unconsumed events
// from other turbines (including this one), those may indicate an underlying problem.
// So: create a report with all the registered turbines, and include exception as cause
val reportsWithExceptions = turbineRegistry.map {
it.reportUnconsumedEvents()
// The exception will have cancelled its job hierarchy, producing cancellation exceptions
// in its wake. These aren't meaningful test feedback
.stripCancellations()
}
.filter { it.cause != null }
if (reportsWithExceptions.isEmpty()) {
throw e
} else {
throw TurbineAssertionError(
buildString {
reportsWithExceptions.forEach {
it.describeException(this@buildString)
}
},
e,
)
}
turbine {
collectTurbineIn(this, null, name).apply {
val testContext = TurbineTestContextImpl(this@apply, currentCoroutineContext())
if (timeout != null) {
withTurbineTimeout(timeout) {
testContext.validate()
}
} else {
testContext.validate()
}
cancel()
ensureAllEventsConsumed()
}
}
}
Expand Down
37 changes: 37 additions & 0 deletions src/commonTest/kotlin/app/cash/turbine/FlowInScopeTest.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app.cash.turbine

import kotlin.test.Test
import kotlin.test.assertContains
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
Expand Down Expand Up @@ -246,4 +247,40 @@ class FlowInScopeTest {
cause.message,
)
}

@Test
fun innerFailingFlowIsReported() = runTest {
val expected = CustomThrowable("hi")

val actual = assertFailsWith<AssertionError> {
turbine {
flow<Nothing> {
throw expected
}.testIn(backgroundScope, name = "inner failing")

Turbine<Unit>(name = "inner").awaitItem()
}
}

val expectedPrefix = """
|Unconsumed exception found for inner failing:
|
|Stack trace:
""".trimMargin()
assertEquals(
actual.message?.startsWith(
expectedPrefix,
),
true,
"Expected to start with:\n\n$expectedPrefix\n\nBut was:\n\n${actual.message}",
)
assertContains(
actual.message!!,
"CustomThrowable: hi",
)
assertEquals(
actual.cause?.message,
"No value produced for inner in 3s",
)
}
}

0 comments on commit 52970ab

Please sign in to comment.