Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow subflows to generate and pass outputs in the same way as functions can return data #2133

Closed
anna-geller opened this issue Sep 18, 2023 · 5 comments
Assignees
Labels
enhancement New feature or request
Milestone

Comments

@anna-geller
Copy link
Member

anna-geller commented Sep 18, 2023

Feature description

Problem

Passing data between subflows is difficult and makes decoupled patterns to some extent challenging. Currently, the parent flow needs to know the internals of the child flow, including the task ID and output name. It would be easier if the child flow could expose or return specific data as the output of the flow.

See the example below:

image

image

  1. child flow
id: flow1
namespace: blueprint
tasks:
  - id: returnData
    type: io.kestra.core.tasks.debugs.Return
    format: this is a secret message returned from {{flow.id}}
  1. parent flow
id: passDataBetweenSubflows
namespace: blueprint

tasks:
  - id: flow1
    type: io.kestra.core.tasks.flows.Flow
    namespace: blueprint
    flowId: flow1
    wait: true
    outputs:
      dataFromChildFlow: "{{outputs.returnData.value}}"
  
  - id: log
    type: io.kestra.core.tasks.log.Log
    message: "{{outputs.flow1.outputs.dataFromChildFlow}}"

Proposal

Add a new outputs property on the flow level, allowing to pass data by key in the format:
{{ outputs.subflow_task_id.output_key}}.

Here is an example of a child flow returning outputs via an arbitrary key name:

child flow:

id: flow1
namespace: dev

tasks:
  - id: returnData
    type: io.kestra.core.tasks.debugs.Return
    format: this is a secret message returned from {{flow.id}}

outputs:
  key_name: "{{ outputs.returnData.value }}"

the parent flow can easily access that value by key without having to know its internals:

id: passDataBetweenSubflows
namespace: blueprint

tasks:
  - id: flow1
    type: io.kestra.core.tasks.flows.Subflow
    namespace: dev
    flowId: flow1
    wait: true
  
  - id: log
    type: io.kestra.core.tasks.log.Log
    message: "{{outputs.flow1.key_name}}"

⚠️ The outputs of the child flow will be propagated to the subflow task by key only if wait: true.

Implications

This will be especially important to simplify an optional "reduce" step when performing a map-reduce-like operation iterating over a list of values and executing a bunch of subflow executions that may optionally return data that can be aggregated/zipped/"reduced" at the end of the parent flow using the ForEachItem pattern: #2131

UI-side changes

The flow execution outputs will be stored in internal storage. It will require adding outputs on the Executions overview tab:

image

@anna-geller anna-geller added the enhancement New feature or request label Sep 18, 2023
@loicmathieu
Copy link
Member

loicmathieu commented Sep 18, 2023

There is already too much things in the output.
Maybe a function that can access the parent execution and gather the output will do this trick? This would avoid copying the parent outputs inside the child output.

  - id: log
    type: io.kestra.core.tasks.log.Log
    message: "{{parentOutput('key_name')}}"

@anna-geller
Copy link
Member Author

anna-geller commented Sep 18, 2023

thanks. It seems that this function would still require specification about the outputs on the child flow before they can be retrieved by key from a parent.

The intention here is that the flow outputs would not be displayed in the Outputs tab, only in the Overview, in the same way as FILE-type inputs are passed to an Execution. It seems consistent: a flow can have inputs and outputs, and they both can be accessed in the Overview tab (I am not sure how feasible it is to populate that outputs table only after the flow completes, though)

what we also hope to achieve is adding a unit/integration test feature: if the flow can have inputs and outputs, the unit test can assert that given specific test inputs, the flow generates specific outputs

Another way of looking at it is to treat the flow outputs as data contracts. The outputs define something that should ideally never change because they define data that might be used by other downstream consumers (other flows). The definition of how that output is generated may change, e.g., instead of using the Download or HTTP Request task, your flow may now use a Script task to output specific data but it will still be passed using the same output key to downstream processes.

@anna-geller anna-geller modified the milestones: v0.14.0, v0.15.0 Dec 4, 2023
@anna-geller anna-geller changed the title Allow (sub)flows to generate and pass outputs in the same way as functions can return data Allow subflows to generate and pass outputs in the same way as functions can return data Jan 2, 2024
@anna-geller
Copy link
Member Author

Outputs will consist of:

  • keys which are strings
  • values which are serialized objects (e.g. it can be an array passed downstream, serialized as a JSON string)

A more complex object passed as output - discussed with Ludo yesterday:

  1. child flow
id: flow1
namespace: dev

tasks:
  - id: returnData
    type: io.kestra.core.tasks.debugs.Return
    format: this is a secret message returned from {{flow.id}}

  - id: complexOne
    type: io.kestra.core.tasks.debugs.Return
    format: "{{ range(0, 10) }}"

outputs:
  simple: "{{ outputs.returnData.value }}"
  complex: "{{ outputs.complexOne.value }}" # JSON string on the consumer side, but array [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] on the producer side 
  1. the parent flow can access output values by keys without having to know the child flow's internals:
id: passDataBetweenSubflows
namespace: blueprint

tasks:
  - id: flow1
    type: io.kestra.core.tasks.flows.Subflow
    namespace: dev
    flowId: flow1
    wait: true

  - id: log
    type: io.kestra.core.tasks.log.Log
    message: "{{ outputs.flow1.simple }}"
#    message: "{{outputs.flow1.vars.simple}}" # worst case with vars

  - id: log2
    type: io.kestra.core.tasks.log.Log
    message: "{{ outputs.flow1.complex }}" # on the consumer side, use "{{ json(outputs.flow1.complex)[0] }}" to get specific values from a JSON string containing an array

@anna-geller
Copy link
Member Author

anna-geller commented Jan 18, 2024

As part of this issue, we will mark the outputs property as deprecated and point to docs on how to use flow outputs:

id: passDataBetweenSubflows
namespace: blueprint

tasks:
  - id: flow1
    type: io.kestra.core.tasks.flows.Flow
    namespace: blueprint
    flowId: flow1
    wait: true
    outputs: # when specified, warning in the UI that it's deprecated
      dataFromChildFlow: "{{outputs.returnData.value}}"
  
  - id: log
    type: io.kestra.core.tasks.log.Log
    message: "{{outputs.flow1.outputs.dataFromChildFlow}}"

@anna-geller
Copy link
Member Author

anna-geller commented Feb 8, 2024

@fhussonnois as discussed closing the issue as done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants