Skip to content

Commit

Permalink
Replace WorkflowHost with the runWorkflow function.
Browse files Browse the repository at this point in the history
Closes #439.
  • Loading branch information
zach-klippenstein committed Jul 8, 2019
1 parent 6bd1eb2 commit b590f56
Show file tree
Hide file tree
Showing 11 changed files with 443 additions and 726 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ import com.googlecode.lanterna.screen.TerminalScreen
import com.googlecode.lanterna.terminal.DefaultTerminalFactory
import com.squareup.workflow.Worker
import com.squareup.workflow.Workflow
import com.squareup.workflow.WorkflowHost
import com.squareup.workflow.asWorker
import com.squareup.workflow.runWorkflowIn
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.first
Expand All @@ -42,21 +42,17 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.plus
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.selects.selectUnbiased
import kotlin.coroutines.EmptyCoroutineContext

/**
* Hosts [Workflow]s that:
* - gets information about the terminal configuration as input
* - renders the text to display on the terminal
* - finishes by emitting an exit code that should be passed to [kotlin.system.exitProcess].
*
* @param hostFactory Used to create the actual [WorkflowHost] that hosts workflows. Any dispatcher
* configured on the host will be ignored, to ensure that key events stay in sync with renderings.
* @param ioDispatcher Defaults to [Dispatchers.IO] and is used to listen for key events using
* blocking APIs.
*/
class TerminalWorkflowRunner(
private val hostFactory: WorkflowHost.Factory = WorkflowHost.Factory(EmptyCoroutineContext),
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO
) {

Expand All @@ -74,8 +70,6 @@ class TerminalWorkflowRunner(
// when invoking from coroutines. This entire function is blocking however, so we don't care.
@Suppress("BlockingMethodInNonBlockingContext")
fun run(workflow: TerminalWorkflow): ExitCode = runBlocking {
val configs = BroadcastChannel<TerminalInput>(CONFLATED)
val host = hostFactory.run(workflow, configs.asFlow(), context = coroutineContext)
val keyStrokesChannel = screen.listenForKeyStrokesOn(this + ioDispatcher)
val keyStrokesWorker = keyStrokesChannel.asWorker()
val resizes = screen.terminal.listenForResizesOn(this)
Expand All @@ -86,7 +80,7 @@ class TerminalWorkflowRunner(
try {
screen.startScreen()
try {
runTerminalWorkflow(screen, configs, host, keyStrokesWorker, resizes)
return@runBlocking runTerminalWorkflow(workflow, screen, keyStrokesWorker, resizes)
} finally {
screen.stopScreen()
}
Expand All @@ -102,67 +96,72 @@ class TerminalWorkflowRunner(
@Suppress("BlockingMethodInNonBlockingContext")
@UseExperimental(FlowPreview::class, ExperimentalCoroutinesApi::class)
private suspend fun runTerminalWorkflow(
workflow: TerminalWorkflow,
screen: TerminalScreen,
inputs: SendChannel<TerminalInput>,
host: WorkflowHost<ExitCode, TerminalRendering>,
keyStrokes: Worker<KeyStroke>,
resizes: ReceiveChannel<TerminalSize>
): ExitCode = coroutineScope {
var input = TerminalInput(screen.terminalSize.toSize(), keyStrokes)
val inputs = ConflatedBroadcastChannel(input)

// Use the result as the parent Job of the runtime coroutine so it gets cancelled automatically
// if there's an error.
val result =
runWorkflowIn(this, workflow, inputs.asFlow()) { renderingsAndSnapshots, outputs ->
val renderings = renderingsAndSnapshots.map { it.rendering }
.produceIn(this)

launch {
while (true) {
val rendering = selectUnbiased<TerminalRendering> {
resizes.onReceive {
screen.doResizeIfNecessary()
?.let {
// If the terminal was resized since the last iteration, we need to notify the
// workflow.
input = input.copy(size = it.toSize())
}

// Publish config changes to the workflow.
inputs.send(input)

// Sending that new input invalidated the lastRendering, so we don't want to
// re-iterate until we have a new rendering with a fresh event handler. It also
// triggered a render pass, so we can just retrieve that immediately.
return@onReceive renderings.receive()
}

renderings.onReceive { it }
}

// Need to send an initial input for the workflow to start running.
inputs.offer(input)

// Launch the render loop in a new coroutine, so this coroutine can just sit around and wait
// for the workflow to emit an output.
val renderJob = launch {
val renderings = host.renderingsAndSnapshots.map { it.rendering }
.produceIn(this)

while (true) {
val rendering = selectUnbiased<TerminalRendering> {
resizes.onReceive {
screen.doResizeIfNecessary()
?.let {
// If the terminal was resized since the last iteration, we need to notify the
// workflow.
input = input.copy(size = it.toSize())
screen.clear()
screen.newTextGraphics()
.apply {
foregroundColor = rendering.textColor.toTextColor()
backgroundColor = rendering.backgroundColor.toTextColor()
rendering.text.lineSequence()
.forEachIndexed { index, line ->
putString(TOP_LEFT_CORNER.withRelativeRow(index), line)
}
}

// Publish config changes to the workflow.
inputs.send(input)

// Sending that new input invalidated the lastRendering, so we don't want to
// re-iterate until we have a new rendering with a fresh event handler. It also
// triggered a render pass, so we can just retrieve that immediately.
return@onReceive renderings.receive()
screen.refresh(COMPLETE)
}

renderings.onReceive { it }
}

screen.clear()
screen.newTextGraphics()
.apply {
foregroundColor = rendering.textColor.toTextColor()
backgroundColor = rendering.backgroundColor.toTextColor()
rendering.text.lineSequence()
.forEachIndexed { index, line ->
putString(TOP_LEFT_CORNER.withRelativeRow(index), line)
}
}

screen.refresh(COMPLETE)
// Stop the runtime and return the exit code as soon as the workflow emits one.
val workflowJob = coroutineContext[Job]!!
return@runWorkflowIn async {
val exitCode = outputs.first()
// If we don't cancel the workflow runtime explicitly, coroutineScope will hang waiting for
// it to finish.
workflowJob.cancel(
CancellationException("TerminalWorkflowRunner completed with exit code $exitCode")
)
// TODO This won't work, this job will already have been cancelled.
return@async exitCode
}
}
}

// Start collecting from outputs before starting the workflow host, in case it emits immediately.
val exitCodeDeferred = async { host.outputs.first() }
val workflowJob = host.start()

// Stop the runner and return the exit code as soon as the workflow emits one.
val exitCode = exitCodeDeferred.await()
workflowJob.cancel()
renderJob.cancel()
return@coroutineScope exitCode
return@coroutineScope result.await()
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.squareup.workflow

/**
* Emitted from [WorkflowHost.renderingsAndSnapshots] after every render pass.
* Tuple of rendering and snapshot used by [runWorkflowIn].
*/
data class RenderingAndSnapshot<out RenderingT>(
val rendering: RenderingT,
Expand Down
Loading

0 comments on commit b590f56

Please sign in to comment.