Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ForEachItem task #2359

Merged
merged 15 commits into from
Nov 3, 2023
Merged

ForEachItem task #2359

merged 15 commits into from
Nov 3, 2023

Conversation

loicmathieu
Copy link
Member

@loicmathieu loicmathieu commented Oct 20, 2023

Fixes #2131

What changes are being made and why?

Add a ForEachItem task that will split a file in batches and will trigger a subflow execution for each batch.

The current implementation will generates as many subflows as there is batches with unlimited concurrency.


How the changes have been QAed?

Parent flow

id: each-item
namespace: dev

tasks:
  - id: extract
    type: io.kestra.plugin.gcp.bigquery.Query
    sql: |
      SELECT DATETIME(datehour) as date, title, views FROM `bigquery-public-data.wikipedia.pageviews_2023` 
      WHERE DATE(datehour) = current_date() and wiki = 'fr' and title not in ('Cookie_(informatique)', 'Wikipédia:Accueil_principal', 'Spécial:Recherche')
      ORDER BY datehour desc, views desc
      LIMIT 10000
    store: true
  
  - id: each
    type: io.kestra.core.tasks.flows.ForEachItem
    items: "{{ outputs.extract.uri }}"
    maxItemsPerBatch: 10
    subflow:
      namespace: dev
      flowId: per-item
      wait: true
      transmitFailed: true

Sub flow:

id: per-item
namespace: dev

tasks:
  - id: per-item
    type: io.kestra.core.tasks.log.Log
    message: "{{ trigger.items }}"

@loicmathieu loicmathieu mentioned this pull request Oct 20, 2023
@loicmathieu loicmathieu force-pushed the feat/ForEachItemv2 branch 4 times, most recently from eab6b53 to c9e3f86 Compare October 23, 2023 13:29
@loicmathieu
Copy link
Member Author

Note that test cannot be added to the MemoryExecutor as it fails with the known error:

2023-10-27 15:51:48,678 ERROR memory-queue_17 .TestThreadUncaughtExceptionHandlers Caught an exception in Thread[memory-queue_17,5,main]. Keep it running.
java.lang.IllegalStateException: Execution state don't exist for 1Xts6TJZEB6IRzqs15ZBtZ, receive WorkerTaskResult(taskRun=TaskRun(tenantId=null, id=7LWGpQxNqbshkyIhWjc6NP, executionId=1Xts6TJZEB6IRzqs15ZBtZ, namespace=io.kestra.tests, flowId=trigger-flow-listener-no-condition, taskId=only-listener, parentTaskRunId=null, value=null, attempts=null, outputs=null, state=State(current=RUNNING, histories=[State.History(state=CREATED, date=2023-10-27T13:51:48.541411428Z), State.History(state=RUNNING, date=2023-10-27T13:51:48.555250723Z)])), dynamicTaskRuns=null)
	at io.kestra.runner.memory.MemoryExecutor.lambda$workerTaskResultQueue$11(MemoryExecutor.java:352)
	at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1916)
	at io.kestra.runner.memory.MemoryExecutor.workerTaskResultQueue(MemoryExecutor.java:350)
	at io.kestra.runner.memory.MemoryQueue.lambda$produce$0(MemoryQueue.java:79)
	at io.micrometer.core.instrument.internal.TimedRunnable.run(TimedRunnable.java:49)

I try to fix this error without success so I decided to only test the task with the JDBC and the Kafak runners.

@loicmathieu loicmathieu marked this pull request as ready for review October 27, 2023 13:52
Copy link
Member

@anna-geller anna-geller left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments on examples, otherwise awesome work! 🎸

@anna-geller
Copy link
Member

I took it for a spin just now and it seems that the internal storage file for a given batch cannot be accessed by the subflow: https://share.descript.com/view/dLuk3uN2F0L

reproducer:

subflow

id: subflow
namespace: qa

tasks:
  - id: for_each_item
    type: io.kestra.plugin.scripts.shell.Commands
    runner: PROCESS
    commands:
      - cat "{{ trigger.items }}"

parent

id: each_parent
namespace: dev

tasks:
  - id: extract
    type: io.kestra.plugin.jdbc.duckdb.Query
    sql: |
      INSTALL httpfs;
      LOAD httpfs;
      SELECT *
      FROM read_csv_auto('https://raw.githubusercontent.com/kestra-io/datasets/main/csv/orders.csv', header=True);
    store: true
  
  - id: each
    type: io.kestra.core.tasks.flows.ForEachItem
    items: "{{ outputs.extract.uri }}"
    maxItemsPerBatch: 10
    subflow:
      namespace: qa
      flowId: subflow
      wait: true
      transmitFailed: true

in the same way, the preview on the Overview page doesn't work and topology view cannot be expanded for that subflow (but this last one is a UI feature that Yann can help with):

https://share.descript.com/view/EP3CKE2s5RW

Copy link
Member

@anna-geller anna-geller left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see other comment, QA didn't fully pass

@loicmathieu
Copy link
Member Author

@anna-geller are you sure your branch was up-to-date while doing the QA as I fixed this issue on Tuesday


@NotEmpty
@PluginProperty(dynamic = true)
@Schema(title = "The items to be split into batches and processed. Make sure to set it to Kestra's internal storage URI, e.g. output from a previous task in the format `{{ outputs.task_id.uri }}` or an input parameter of FILE type e.g. `{{ inputs.myfile }}`.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or from the namespace files storage ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the standard comment, but yes, it should be possible. But note that we currently have no way to have the storage URI of a namespace file easily (we only have a Pebble function to read the content).
For now, I prefer to keep the standard comment we use everywhere

@anna-geller
Copy link
Member

anna-geller commented Nov 2, 2023

yes @loicmathieu I deleted my local branch and re-pulled again just to be sure and still exactly as described above, please try the reproducer

the server log I'm getting when trying to preview a batch file:
2023-11-02 10:04:22,096 WARN default-nioEventLoopGroup-1-13 io.kestra.webserver.access [Date: 2023-11-02T10:04:22.071922+01:00] [Duration: 24 ms] [Method: GET] [Url: /api/v1/executions/62ABgYpG6PovQxgDMtl02N/file/preview?executionId=62ABgYpG6PovQxgDMtl02N&path=kestra:%2F%2F%2Fdev%2Feach_parent%2Fexecutions%2F1xHlFEW4x3YoGngbwtEBmp%2Ftasks%2Feach%2F8dVOCngDLvdVqHdb9mEdd%2Fbatch-6.ion&maxRows=100] [Status: 404] [Length: 428] [Ip: 0:0:0:0:0:0:0:1] [Port: 8080]

@anna-geller
Copy link
Member

I can confirm that the issue occurs only when the flow/task have underscores in the name like each_parent, without that, both accessing the internal storage file and the preview work fine

image

@loicmathieu
Copy link
Member Author

@anna-geller this has been fixed

Copy link

sonarcloud bot commented Nov 2, 2023

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 19 Code Smells

85.9% 85.9% Coverage
0.0% 0.0% Duplication

@tchiotludo tchiotludo merged commit a570cc6 into develop Nov 3, 2023
6 checks passed
@tchiotludo tchiotludo deleted the feat/ForEachItemv2 branch November 3, 2023 08:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants