Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace WorkflowHost with a static runWorkflow function #439

Closed
rjrjr opened this issue Jul 3, 2019 · 5 comments · Fixed by #447
Closed

Replace WorkflowHost with a static runWorkflow function #439

rjrjr opened this issue Jul 3, 2019 · 5 comments · Fixed by #447
Assignees
Labels
enhancement New feature or request kotlin Affects the Kotlin library.

Comments

@rjrjr
Copy link
Contributor

rjrjr commented Jul 3, 2019

WorkflowHost.Factory isn't useful, and WorkflowHost.start isn't quite idiomatic. Instead:

suspend fun <I, O, R> runWorkflow(
  workflow: Workflow<I, O, R>,
  inputs: Flow<I>,
  block: CoroutineScope.(Flow<RS>, Flow<O>) -> Unit
): Nothing

Note that block does not suspend. It is a setup function. The workflow is not started until block exits. runWorkflow blocks forever, since workflows have no way to end themselves.

@zach-klippenstein
Copy link
Collaborator

I started to implement this, but the API for cancelling the runtime from within the block feels very awkward. It stops the runtime, but causes the runWorkflow function to throw. For the workflow test infrastructure, and probably for other main-function-based workflow runners (e.g. terminal samples), we need to catch that cancellation exception, determine if the block initiated the cancel with the "finish" intent, and if it did then just continue executing the enclosing function normally (if the cancellation exception was thrown because it was cancelled externally, it should be rethrown).

I'm starting to think runWorkflow should have a type parameter T, and return that type instead of Nothing. The receiver on block can be a custom CoroutineScope subtype that includes a finishWorkflow(result: T): Boolean method that cancels the runtime and causes runWorkflow to return result. This method would behave like CompletableDeferred.complete wrt returned boolean and races.

For main-based apps, T would likely be the Int exit code. For the testing infra, it would be the return type of the test block. For WorkflowRunnerViewModel it would still be Nothing.

@zach-klippenstein
Copy link
Collaborator

zach-klippenstein commented Jul 4, 2019

Note to self for implementation:

// Result participates in job hierarchy so it will automatically get cancelled if the runtime 
// is cancelled externally (so that finishWorkflow will return false).
val resultDeferred = CompletableDeferred<T>(parent = coroutineContext[Job])

launch {
  val workflowJob = coroutineScope[Job]!!
  fun finishWorkflow(result: T): Boolean {
    workflowJob.cancel(CancellationException("Workflow finished normally"))
    return resultDeferred.complete(result)
  }
  val workflowScope = WorkflowScope(this, ::finishWork)

  beforeSetup(workflowScope, renderings, outputs)
  
  // Yield to allow any coroutines created by beforeSetup to collect the flows to start.
  // This means that collection will start before the runtime, even if launch(UNDISPATCHED) is not used.
  yield()

  // Run loop
}

return resultDeferred.await()

@zach-klippenstein
Copy link
Collaborator

zach-klippenstein commented Jul 4, 2019

Also, we forgot the snapshot parameter. So the actual function signature we need is:

suspend fun <InputT, OutputT : Any, RenderingT, ResultT> runWorkflow(
  workflow: Workflow<InputT, OutputT, RenderingT>,
  inputs: Flow<InputT>,
  initialSnapshot: Snapshot? = null,
  beforeStart: WorkflowScope<ResultT>.(
    renderingsAndSnapshots: Flow<RenderingAndSnapshot<RenderingT>>,
    outputs: Flow<OutputT>
  ) -> Unit
): ResultT

where WorkflowScope is:

interface WorkflowScope<in ResultT> : CoroutineScope {
  fun finishWorkflow(result: ResultT): Boolean
}

I am also wondering if we have any uses where we're not supplying inputs from a Channel — then something like this might be more convenient:

interface WorkflowScope<in InputT, in ResultT> : CoroutineScope {
  fun setInput(input: InputT)
  fun finishWorkflow(result: ResultT): Boolean
}

suspend fun <InputT, OutputT : Any, RenderingT, ResultT> runWorkflow(
  workflow: Workflow<InputT, OutputT, RenderingT>,
  initialInput: InputT,
  initialSnapshot: Snapshot? = null,
  beforeStart: WorkflowScope<InputT, ResultT>.(
    renderingsAndSnapshots: Flow<RenderingAndSnapshot<RenderingT>>,
    outputs: Flow<OutputT>
  ) -> Unit
): ResultT
  • Forces you to provide the initial input immediately, prevents accidentally using emptyFlow() and wondering why your workflow never runs.
  • No additional boilerplate or lifecycle management for callers who want to update their inputs at some point in the future.

We could even make beforeStart return the initial input instead of passing it in directly, although that might just be more confusing and not add any value. At any rate, definitely not going to make that a thing now. Flow<InputT> is the most flexible and I don't think we have enough use cases to justify narrowing the API that much yet.

@zach-klippenstein
Copy link
Collaborator

Probably useful (internally at least) to have:

inline fun <R> WorkflowScope(
  coroutineScope: CoroutineScope,
  crossinline finishWorkflow: (R) -> Boolean
): WorkflowScope<R> = object : WorkflowScope<R>, CoroutineScope by coroutineScope {
  override fun finishWorkflow(result: R): Boolean = finishWorkflow(result)
}

@zach-klippenstein
Copy link
Collaborator

zach-klippenstein commented Jul 4, 2019

And for #389:

fun <I, O> runWorkflowOutputs(
  workflow: Workflow<I, O, *>,
  inputs: Flow<I>
): Flow<O> = channelFlow {
  runWorkflow(workflow, inputs) { _, outputs ->
    outputs.onEach(::send)
      .launchIn(this)
  }
}

// Don't have a use case for this without renderings, but it's even simpler.
suspend fun <I, O> runWorkflowUntilFirstOutput(
  workflow: Workflow<I, O, *>,
  inputs: Flow<I>
): O = runWorkflow(workflow, inputs) { _, outputs ->
  outputs.onEach { finishWorkflow(it) }
    .launchIn(this)
}

Note there's no initial snapshot because there's no way to read snapshots.

We could also do one for renderings:

fun <I, R> runWorkflowRenderings(
  workflow: Workflow<I, *, R>,
  inputs: Flow<I>,
  initialSnapshot: suspend () -> Snapshot?
): Flow<RenderingAndSnapshot<R>> = channelFlow {
  runWorkflow(workflow, inputs, initialSnapshot()) { renderings, _ ->
    renderings.onEach(::send)
      .launchIn(this)
  }
}

Note that initialSnapshot is a function, because a new initial snapshot will be needed every time the returned Flow is collected. And the function is suspending in case the snapshot needs to read with IO.

@zach-klippenstein zach-klippenstein added the kotlin Affects the Kotlin library. label Mar 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request kotlin Affects the Kotlin library.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants