Support a "generator" step which fans out an unknown number of sub-workflows #5938
justinmchase
started this conversation in
Ideas
Replies: 1 comment
-
Related to #5740 |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
In all the docs that I've seen and my limited experimentation it seems like Argo can't quite handle a situation I need to support which is the ability to dynamically fan out a large number of sub-steps.
For a practical example imagine a CSV file comes in with a million rows and the size of the document is 1GB. Processing this file takes a non-trivial amount of time.
The current examples I've seen for processing a CSV file include changing the CSV file into a big json document, then outputting the new file as well as the row count, then using that count as a loop variable and having each sub step open the document, read it as json, then select the Nth row and process it. This will not work on a document this large, it would be too slow to read an entire document of that size and take too much memory, streaming will be necessary. Also, completely read the entire document before the child steps start running would take too long, the child workflows need to start running right away before the first step is done... that is a big difference from what I've seen so far in argo.
Now I need this to be a workflow because each row has its own complex workflow and once all of them are completed I need to have subsequent workflow steps that need to be run. So while you might think that I could just throw each row into a kafka stream and then hookup and event source and sensor, this will sadly not work well either since I need to run steps once these are all completed, so it needs to be all contained in a single workflow.
Fortunately, I think this would actually be fairly easy to solve by creating a kind of step which treats each line of output as a result. Think of a "generator" function where each line written to stdout becomes the result parameter input for the next step. The number of next steps depends on the number of lines written to output.
Consider this hypothetical example:
So in this example I am flagging a template and step as having
outputs.generator=true
, and then the step prints outN
lines.The next step sets a new
withGenerator={{steps.generator.outputs.generator}}
, to indicate that it should loop on the generated output of the previous step. Each invocation of the step gets an{{item}}
variable which contains the output from the generated output.This will essentially queue and process each item even while the previous step may still be running. Once the generator step and all subsequent child steps are completed the workflow continues to the
all-done
step which now has access to thesteps.generate.outputs.generate.count
variable. It would not have access to the outputs of individual items, nor would it have access to the outputs of individualsub-step
output.Another scenario I've heard people discuss, as a for-example, is the processing of video files. Where each frame is handed out and processed in parallel and once completed a following step reduces all of the artifacts from the previous workflow and re-encodes them all into a new video. Its not possible to know the number of frames ahead of time, you have to decode the video to figure that out and the file can be very very large and its impractical to read it multiple times and seeking to an arbitrary frame is difficult and slow as it requires a lot of decoding.
I also want to point out that this feature is not supported by Airflow or AWS Step functions, so it would be a competitive advantage to implement it efficiently here. In the past when I had to do this I had to build my own workflow engine, which I'd really rather not have to do again. I like argo quite a bit and as far as I can tell this is its biggest gap and I'd really like to use it.
Beta Was this translation helpful? Give feedback.
All reactions