From cae54fc86aeb50a33fade7455369c396d87c6fad Mon Sep 17 00:00:00 2001 From: Zach Klippenstein Date: Sun, 7 Jul 2019 11:30:31 -0700 Subject: [PATCH] wip: unit tests --- .../java/com/squareup/workflow/RunWorkflow.kt | 10 +- .../workflow/internal/WorkflowLoop.kt | 142 ++++++----- .../workflow/internal/RunWorkflowTest.kt | 223 +++++++++++++----- 3 files changed, 246 insertions(+), 129 deletions(-) diff --git a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/RunWorkflow.kt b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/RunWorkflow.kt index b57220b40..d0f15df78 100644 --- a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/RunWorkflow.kt +++ b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/RunWorkflow.kt @@ -15,7 +15,8 @@ */ package com.squareup.workflow -import com.squareup.workflow.internal.runWorkflowLoop +import com.squareup.workflow.internal.RealWorkflowLoop +import com.squareup.workflow.internal.WorkflowLoop import com.squareup.workflow.internal.unwrapCancellationCause import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineStart.ATOMIC @@ -60,6 +61,7 @@ fun runWorkflowIn( ) -> ResultT ): ResultT = runWorkflowImpl( scope, + RealWorkflowLoop, workflow.asStatefulWorkflow(), inputs, initialSnapshot = initialSnapshot, @@ -83,6 +85,7 @@ fun runWorkflowForTestFromS ) -> ResultT ): ResultT = runWorkflowImpl( scope, + RealWorkflowLoop, workflow, inputs, initialState = initialState, @@ -94,8 +97,9 @@ fun runWorkflowForTestFromS * TODO write documentation */ @UseExperimental(ExperimentalCoroutinesApi::class, FlowPreview::class) -private fun runWorkflowImpl( +internal fun runWorkflowImpl( scope: CoroutineScope, + workflowLoop: WorkflowLoop, workflow: StatefulWorkflow, inputs: Flow, initialSnapshot: Snapshot?, @@ -121,7 +125,7 @@ private fun runWorkflowImpl workflowScope.launch(start = ATOMIC) { // Run the workflow processing loop forever, or until it fails or is cancelled. - runWorkflowLoop( + workflowLoop.runWorkflowLoop( workflow, inputs, initialSnapshot = initialSnapshot, diff --git a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowLoop.kt b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowLoop.kt index e509d785c..23856141e 100644 --- a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowLoop.kt +++ b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowLoop.kt @@ -27,78 +27,94 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.produceIn import kotlinx.coroutines.selects.select -/** - * Loops forever, or until the coroutine is cancelled, processing the workflow tree and emitting - * updates by calling [onRendering] and [onOutput]. - * - * This function is the lowest-level entry point into the runtime. Don't call this directly, instead - * call [com.squareup.workflow.runWorkflowIn]. - */ -@UseExperimental(FlowPreview::class, ExperimentalCoroutinesApi::class) -internal suspend fun runWorkflowLoop( - workflow: StatefulWorkflow, - inputs: Flow, - initialSnapshot: Snapshot?, - initialState: StateT? = null, - onRendering: suspend (RenderingAndSnapshot) -> Unit, - onOutput: suspend (OutputT) -> Unit -): Nothing = coroutineScope { - val inputsChannel = inputs.produceIn(this) - inputsChannel.consume { - var output: OutputT? = null - var input: InputT = inputsChannel.receive() - var inputsClosed = false - val rootNode = WorkflowNode( - id = workflow.id(), - workflow = workflow, - initialInput = input, - snapshot = initialSnapshot, - baseContext = coroutineContext, - initialState = initialState - ) +@UseExperimental(ExperimentalCoroutinesApi::class) +internal interface WorkflowLoop { + + /** + * Loops forever, or until the coroutine is cancelled, processing the workflow tree and emitting + * updates by calling [onRendering] and [onOutput]. + * + * This function is the lowest-level entry point into the runtime. Don't call this directly, instead + * call [com.squareup.workflow.runWorkflowIn]. + */ + suspend fun runWorkflowLoop( + workflow: StatefulWorkflow, + inputs: Flow, + initialSnapshot: Snapshot?, + initialState: StateT? = null, + onRendering: suspend (RenderingAndSnapshot) -> Unit, + onOutput: suspend (OutputT) -> Unit + ): Nothing +} - try { - while (true) { - coroutineContext.ensureActive() +internal object RealWorkflowLoop : WorkflowLoop { - val rendering = rootNode.render(workflow, input) - val snapshot = rootNode.snapshot(workflow) + @UseExperimental(FlowPreview::class, ExperimentalCoroutinesApi::class) + override suspend fun runWorkflowLoop( + workflow: StatefulWorkflow, + inputs: Flow, + initialSnapshot: Snapshot?, + initialState: StateT?, + onRendering: suspend (RenderingAndSnapshot) -> Unit, + onOutput: suspend (OutputT) -> Unit + ): Nothing = coroutineScope { + val inputsChannel = inputs.produceIn(this) + inputsChannel.consume { + var output: OutputT? = null + var input: InputT = inputsChannel.receive() + var inputsClosed = false + val rootNode = WorkflowNode( + id = workflow.id(), + workflow = workflow, + initialInput = input, + snapshot = initialSnapshot, + baseContext = coroutineContext, + initialState = initialState + ) - onRendering(RenderingAndSnapshot(rendering, snapshot)) - output?.let { onOutput(it) } + try { + while (true) { + coroutineContext.ensureActive() - // Tick _might_ return an output, but if it returns null, it means the state or a child - // probably changed, so we should re-render/snapshot and emit again. - output = select { - // Stop trying to read from the inputs channel after it's closed. - if (!inputsClosed) { - @Suppress("EXPERIMENTAL_API_USAGE") - inputsChannel.onReceiveOrNull { newInput -> - if (newInput == null) { - inputsClosed = true - } else { - input = newInput + val rendering = rootNode.render(workflow, input) + val snapshot = rootNode.snapshot(workflow) + + onRendering(RenderingAndSnapshot(rendering, snapshot)) + output?.let { onOutput(it) } + + // Tick _might_ return an output, but if it returns null, it means the state or a child + // probably changed, so we should re-render/snapshot and emit again. + output = select { + // Stop trying to read from the inputs channel after it's closed. + if (!inputsClosed) { + @Suppress("EXPERIMENTAL_API_USAGE") + inputsChannel.onReceiveOrNull { newInput -> + if (newInput == null) { + inputsClosed = true + } else { + input = newInput + } + // No output. Returning from the select will go to the top of the loop to do another + // render pass. + return@onReceiveOrNull null } - // No output. Returning from the select will go to the top of the loop to do another - // render pass. - return@onReceiveOrNull null } - } - // Tick the workflow tree. - rootNode.tick(this) { it } + // Tick the workflow tree. + rootNode.tick(this) { it } + } } + // Compiler gets confused, and thinks both that this throw is unreachable, and without the + // throw that the infinite while loop will exit normally and thus need a return statement. + @Suppress("UNREACHABLE_CODE", "ThrowableNotThrown") + throw AssertionError() + } finally { + // There's a potential race condition if the producer coroutine is cancelled before it has a + // chance to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we + // actually see this cause problems, I'm not too worried about it. + // See https://github.com/Kotlin/kotlinx.coroutines/issues/845 + rootNode.cancel() } - // Compiler gets confused, and thinks both that this throw is unreachable, and without the - // throw that the infinite while loop will exit normally and thus need a return statement. - @Suppress("UNREACHABLE_CODE", "ThrowableNotThrown") - throw AssertionError() - } finally { - // There's a potential race condition if the producer coroutine is cancelled before it has a - // chance to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we - // actually see this cause problems, I'm not too worried about it. - // See https://github.com/Kotlin/kotlinx.coroutines/issues/845 - rootNode.cancel() } } } diff --git a/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/RunWorkflowTest.kt b/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/RunWorkflowTest.kt index 928987e38..8bc65cdec 100644 --- a/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/RunWorkflowTest.kt +++ b/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/RunWorkflowTest.kt @@ -15,75 +15,170 @@ */ package com.squareup.workflow.internal +import com.squareup.workflow.RenderingAndSnapshot +import com.squareup.workflow.Snapshot +import com.squareup.workflow.StatefulWorkflow +import com.squareup.workflow.Workflow +import com.squareup.workflow.runWorkflowImpl +import com.squareup.workflow.stateless +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers.Unconfined +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.produceIn +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.yield import org.junit.Test +import kotlin.test.assertEquals +import kotlin.test.assertNull +import kotlin.test.assertTrue +import kotlin.test.fail +@Suppress("UNCHECKED_CAST") +@UseExperimental(ExperimentalCoroutinesApi::class, FlowPreview::class) class RunWorkflowTest { + private val scope = CoroutineScope(Unconfined) + private val workflow = Workflow.stateless { fail() } + .asStatefulWorkflow() + @Test fun `renderings flow replays to new collectors`() { -// val host = RealWorkflowHost(Unconfined) { onRendering, _ -> -// onRendering(RenderingAndSnapshot("foo", Snapshot.EMPTY)) -// suspendCancellableCoroutine { } -// } -// host.start() -// -// val firstRendering = runBlocking { host.renderingsAndSnapshots.first() } -// assertEquals("foo", firstRendering.rendering) + var rendered = false + val loop = object : WorkflowLoop { + override suspend fun runWorkflowLoop( + workflow: StatefulWorkflow, + inputs: Flow, + initialSnapshot: Snapshot?, + initialState: StateT?, + onRendering: suspend (RenderingAndSnapshot) -> Unit, + onOutput: suspend (OutputT) -> Unit + ): Nothing { + onRendering(RenderingAndSnapshot("foo" as RenderingT, Snapshot.EMPTY)) + rendered = true + hang() + } + } - TODO() + val renderings = runWorkflowImpl( + scope, + loop, + workflow, + emptyFlow(), + initialSnapshot = null, + initialState = null + ) { renderings, _ -> renderings } + + assertTrue(rendered) + runBlocking { + assertEquals("foo", renderings.first().rendering) + } } - // TODO move to new tests -// @Test fun `outputs flow does not replay to new collectors`() { -// val trigger = CompletableDeferred() -// val host = RealWorkflowHost(Unconfined) { _, onOutput -> -// onOutput("one") -// trigger.await() -// onOutput("two") -// } -// host.start() -// -// val outputs = GlobalScope.async(Unconfined) { host.outputs.toList() } -// trigger.complete(Unit) -// assertEquals(listOf("two"), runBlocking { outputs.await() }) -// } + @Test fun `outputs flow does not replay to new collectors`() { + var rendered = false + val loop = object : WorkflowLoop { + override suspend fun runWorkflowLoop( + workflow: StatefulWorkflow, + inputs: Flow, + initialSnapshot: Snapshot?, + initialState: StateT?, + onRendering: suspend (RenderingAndSnapshot) -> Unit, + onOutput: suspend (OutputT) -> Unit + ): Nothing { + onOutput("foo" as OutputT) + rendered = true + hang() + } + } - // TODO move to new tests -// @Test fun `renderings flow is multicasted`() { -// val host = RealWorkflowHost(Unconfined) { onRendering, _ -> -// onRendering(RenderingAndSnapshot("one", Snapshot.EMPTY)) -// onRendering(RenderingAndSnapshot("two", Snapshot.EMPTY)) -// } -// val renderings1 = GlobalScope.async(Unconfined) { -// host.renderingsAndSnapshots.map { it.rendering } -// .toList() -// } -// val renderings2 = GlobalScope.async(Unconfined) { -// host.renderingsAndSnapshots.map { it.rendering } -// .toList() -// } -// host.start() -// -// assertEquals(listOf("one", "two"), runBlocking { renderings1.await() }) -// assertEquals(listOf("one", "two"), runBlocking { renderings2.await() }) -// } + val outputs = runWorkflowImpl( + scope, + loop, + workflow, + emptyFlow(), + initialSnapshot = null, + initialState = null + ) { _, outputs -> outputs } - // TODO move to new tests -// @Test fun `outputs flow is multicasted`() { -// val host = RealWorkflowHost(Unconfined) { _, onOutput -> -// onOutput("one") -// onOutput("two") -// } -// val outputs1 = GlobalScope.async(Unconfined) { -// host.outputs.toList() -// } -// val outputs2 = GlobalScope.async(Unconfined) { -// host.outputs.toList() -// } -// host.start() -// -// assertEquals(listOf("one", "two"), runBlocking { outputs1.await() }) -// assertEquals(listOf("one", "two"), runBlocking { outputs2.await() }) -// } + assertTrue(rendered) + runBlocking { + val outputsChannel = outputs.produceIn(this) + yield() + assertNull(outputsChannel.poll()) + + // Let the test finish. + outputsChannel.cancel() + } + } + + @Test fun `renderings flow is multicasted`() { + val loop = object : WorkflowLoop { + override suspend fun runWorkflowLoop( + workflow: StatefulWorkflow, + inputs: Flow, + initialSnapshot: Snapshot?, + initialState: StateT?, + onRendering: suspend (RenderingAndSnapshot) -> Unit, + onOutput: suspend (OutputT) -> Unit + ): Nothing { + onRendering(RenderingAndSnapshot("foo" as RenderingT, Snapshot.EMPTY)) + hang() + } + } + + val renderings = runWorkflowImpl( + scope, + loop, + workflow, + emptyFlow(), + initialSnapshot = null, + initialState = null + ) { renderings, _ -> renderings } + + runBlocking { + assertEquals("foo", renderings.first().rendering) + assertEquals("foo", renderings.first().rendering) + } + } + + @Test fun `outputs flow is multicasted`() { + val loop = object : WorkflowLoop { + override suspend fun runWorkflowLoop( + workflow: StatefulWorkflow, + inputs: Flow, + initialSnapshot: Snapshot?, + initialState: StateT?, + onRendering: suspend (RenderingAndSnapshot) -> Unit, + onOutput: suspend (OutputT) -> Unit + ): Nothing { + onOutput("foo" as OutputT) + hang() + } + } + + val (outputs1, outputs2) = runWorkflowImpl( + scope, + loop, + workflow, + emptyFlow(), + initialSnapshot = null, + initialState = null + ) { _, outputs -> + Pair( + outputs.produceIn(this), + outputs.produceIn(this) + ) + } + + runBlocking { + assertEquals("foo", outputs1.receive()) + assertEquals("foo", outputs2.receive()) + } + } // @Test fun `renderings flow has no backpressure`() { // val host = RealWorkflowHost(Unconfined) { onRendering, _ -> @@ -145,10 +240,6 @@ class RunWorkflowTest { // assertEquals(2, outputs.poll()) // } - @Test fun `exception thrown from configurator cancels flows`() { - TODO() - } - @Test fun `flows complete immediately when base context is already cancelled on start`() { TODO() } @@ -156,4 +247,10 @@ class RunWorkflowTest { @Test fun `cancelling base context cancels host`() { TODO() } + + @Test fun stuff() { + TODO("Ensure all behaviors previously tested by RealWorkflowHostTest are covered here.") + } } + +private suspend fun hang(): Nothing = suspendCancellableCoroutine { }