Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ForEachItem core task to split a file from internal storage and create a subflow execution for each item or a batch of items #2131

Closed
Tracked by #2136
anna-geller opened this issue Sep 18, 2023 · 2 comments · Fixed by #2359
Assignees
Labels
enhancement New feature or request

Comments

@anna-geller
Copy link
Member

anna-geller commented Sep 18, 2023

Feature description

Problem

Users often fetch data from a source system, e.g.:

  • JDBC query outputs an ION file with iterable rows
  • S3/HTTP Download task outputs an iterable list of items from the API payload, typically downloaded as JSON or a CSV file

Then, they want to execute a series of steps for each item, ideally by executing a subflow.

Current solution

The Split task allows to split an ION file into batches of rows. Then, the user may iterate over those in EachParallel task that triggers a subflow with the Flow task.

id: file
namespace: dev

inputs:
  - name: file
    type: FILE

tasks:
  - id: file
    type: io.kestra.plugin.scripts.shell.Commands
    runner: PROCESS
    commands:
      - cat {{ inputs.file }}
id: each_item_csv
namespace: dev

tasks:
  - id: extract
    type: io.kestra.plugin.fs.http.Download
    uri: https://raw.githubusercontent.com/kestra-io/datasets/main/csv/orders.csv

  - id: split
    type: io.kestra.core.tasks.storages.Split
    from: "{{ outputs.extract.uri }}"
    rows: 10
  
  - id: each
    type: io.kestra.core.tasks.flows.EachParallel
    value: "{{ outputs.split.uris }}"
    tasks:
      - id: subflow
        type: io.kestra.core.tasks.flows.Flow
        namespace: dev
        flowId: file
        inputs:
          file: "{{ taskrun.value }}"
        wait: true
        transmitFailed: true

Problems with this approach:

  1. It requires too much knowledge about Kestra internals and too many steps (Split, EachParallel, Flow tasks)
  2. The user may unknowingly start an Execution running multiple tasks for millions of inputs within a single Execution, leading to possible performance issues.

Possible implementation

Add a new ForEachItem task to the core library.

This task will split an input file into one of the following:

  1. List of individual items or rows so that one subflow execution can be triggered per item
  2. Batches of items or rows so that one subflow execution can be triggered per batch of items (e.g. one execution for a batch of 1000 rows/items)

Syntax proposal

id: each_item_rows
namespace: dev

tasks:
  - id: extract
    type: io.kestra.plugin.jdbc.duckdb.Query
    sql: |
      INSTALL httpfs;
      LOAD httpfs;
      SELECT *
      FROM read_csv_auto('https://raw.githubusercontent.com/kestra-io/datasets/main/csv/orders.csv', header=True);
    store: true
  
  - id: each
    type: io.kestra.core.tasks.flows.ForEachItem
    items: "{{ outputs.extract.uri }}" # works with API payloads too. Kestra can detect if this output is not a file, 
# and will make it to a file, split into (batches of) items
    maxItemsPerBatch: 10
    maxConcurrency: 5 # max 5 concurrent executions, each processing 10 items
    subflow: 
      flowId: file
      namespace: dev
      inputs:
        file: "{{ taskrun.items }}"
      wait: true # wait by default
      transmitFailed: true # true by default 

UI changes

When it comes to technical implementation, we might leverage an internally generated label to filter executions created from a specific parent execution.

related to #796

@loicmathieu
Copy link
Member

Some notes:

  • Why is subflow optional? If the task job is to start a subflow it should be mandatory?
  • Under the cover I used the existing Flow task, so I added some properties to subflow that exists on the Flow task
  • Using taskrun.value to pass the item didn't seems a good idea as it's used in a lot of places in the UI, moreover, taskrun.value is stored inside the execution context so it will defeat the purpose of this task. If we use the line counter as taskrun.value and pass the item(s) inside taskrun.items it would avoid that.

@anna-geller
Copy link
Member Author

As discussed internally, all those possible suggestions are just prompts to kickstart a discussion when we'll get to work on a solution. Implementation is up to you. Let's discuss in a huddle when I'm back after holidays.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
No open projects
Archived in project
Development

Successfully merging a pull request may close this issue.

2 participants