Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: Add map-reduce example. Closes #4165 #4175

Merged
merged 13 commits into from
Oct 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions docs/fields.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ Workflow is the definition of a workflow resource

- [`loops.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down Expand Up @@ -485,6 +487,8 @@ WorkflowSpec is the specification of a Workflow.

- [`loops.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down Expand Up @@ -832,6 +836,8 @@ CronWorkflowSpec is the specification of a CronWorkflow

- [`loops.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down Expand Up @@ -1139,6 +1145,8 @@ WorkflowTemplateSpec is a spec of WorkflowTemplate.

- [`loops.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down Expand Up @@ -1396,6 +1404,8 @@ Arguments to a template

- [`loops.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down Expand Up @@ -1736,6 +1746,8 @@ Template is a reusable and composable unit of execution in a workflow

- [`loops.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down Expand Up @@ -2033,6 +2045,8 @@ Outputs hold parameters, artifacts, and results from a step

- [`k8s-wait-wf.yaml`](https://github.com/argoproj/argo/blob/master/examples/k8s-wait-wf.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down Expand Up @@ -2133,6 +2147,8 @@ Artifact indicates an artifact to place at a specified path

- [`input-artifact-s3.yaml`](https://github.com/argoproj/argo/blob/master/examples/input-artifact-s3.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)

- [`output-artifact-gcs.yaml`](https://github.com/argoproj/argo/blob/master/examples/output-artifact-gcs.yaml)
Expand Down Expand Up @@ -2246,6 +2262,8 @@ Parameter indicate a passed string parameter to a service template with an optio

- [`loops.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down Expand Up @@ -2460,6 +2478,8 @@ DAGTemplate is a template subtype for directed acyclic graph templates

- [`loops-dag.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops-dag.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`parallelism-nested-dag.yaml`](https://github.com/argoproj/argo/blob/master/examples/parallelism-nested-dag.yaml)

- [`parameter-aggregation-dag.yaml`](https://github.com/argoproj/argo/blob/master/examples/parameter-aggregation-dag.yaml)
Expand Down Expand Up @@ -2612,6 +2632,8 @@ Inputs are the mechanism for passing parameters, artifacts, volumes from one tem

- [`loops.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down Expand Up @@ -2841,6 +2863,8 @@ Pod metdata

- [`loops.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down Expand Up @@ -3032,6 +3056,8 @@ ScriptTemplate is a template subtype to enable scripting through code steps

- [`loops-param-result.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops-param-result.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`parameter-aggregation-dag.yaml`](https://github.com/argoproj/argo/blob/master/examples/parameter-aggregation-dag.yaml)

- [`parameter-aggregation-script.yaml`](https://github.com/argoproj/argo/blob/master/examples/parameter-aggregation-script.yaml)
Expand Down Expand Up @@ -3359,6 +3385,8 @@ ArchiveStrategy describes how to archive files/directory when saving artifacts

- [`artifact-passing-subpath.yaml`](https://github.com/argoproj/argo/blob/master/examples/artifact-passing-subpath.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`output-artifact-s3.yaml`](https://github.com/argoproj/argo/blob/master/examples/output-artifact-s3.yaml)
</details>

Expand Down Expand Up @@ -3574,6 +3602,8 @@ ValueFrom describes a location in which to obtain the value to a parameter

- [`k8s-wait-wf.yaml`](https://github.com/argoproj/argo/blob/master/examples/k8s-wait-wf.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down Expand Up @@ -3717,6 +3747,8 @@ DAGTask represents a node in the graph during DAG execution

- [`loops-dag.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops-dag.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`parallelism-nested-dag.yaml`](https://github.com/argoproj/argo/blob/master/examples/parallelism-nested-dag.yaml)

- [`parameter-aggregation-dag.yaml`](https://github.com/argoproj/argo/blob/master/examples/parameter-aggregation-dag.yaml)
Expand Down Expand Up @@ -3836,6 +3868,8 @@ Sequence expands a workflow step into numeric range

- [`loops-sequence.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops-sequence.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`work-avoidance.yaml`](https://github.com/argoproj/argo/blob/master/examples/work-avoidance.yaml)
</details>

Expand Down Expand Up @@ -3879,6 +3913,8 @@ NoneStrategy indicates to skip tar process and upload the files or directory tre

- [`artifact-passing-subpath.yaml`](https://github.com/argoproj/argo/blob/master/examples/artifact-passing-subpath.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`output-artifact-s3.yaml`](https://github.com/argoproj/argo/blob/master/examples/output-artifact-s3.yaml)
</details>

Expand Down Expand Up @@ -4091,6 +4127,8 @@ ObjectMeta is metadata that all persisted resources must have, which includes al

- [`loops.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down Expand Up @@ -5271,6 +5309,8 @@ PersistentVolumeClaimSpec describes the common attributes of storage devices and

- [`loops.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down Expand Up @@ -5817,6 +5857,8 @@ EnvVarSource represents a source for the value of an EnvVar.

- [`k8s-wait-wf.yaml`](https://github.com/argoproj/argo/blob/master/examples/k8s-wait-wf.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down Expand Up @@ -6286,6 +6328,8 @@ ListMeta describes metadata that synthetic resources must have, including lists

- [`loops.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down
162 changes: 162 additions & 0 deletions examples/map-reduce.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# This workflow demonstrates a basic map-reduce.
# This requires you have a artifact repository configured.
#
# Notes:
# - You'll need to have an user namespaced artifact repository set-up to save intermediate results for this workflow.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: map-reduce-
spec:
entrypoint: main
arguments:
parameters:
- name: numParts
value: "4"
- name: numGroups
value: "2"
templates:
- name: main
dag:
tasks:
- name: split
template: split
arguments:
parameters:
- name: numParts
value: "{{workflow.parameters.numParts}}"
- name: map
template: map
arguments:
parameters:
- name: partId
value: '{{item}}'
- name: numGroups
value: '{{workflow.parameters.numGroups}}'
artifacts:
- name: parts
from: '{{tasks.split.outputs.artifacts.parts}}'
dependencies:
- split
withParam: '{{tasks.split.outputs.result}}'
- name: reduce
template: reduce
arguments:
parameters:
- name: group
value: '{{item}}'
dependencies:
- map
withSequence:
count: "{{workflow.parameters.numGroups}}"
# The `split` task creates a number of "parts". Each part has a unique ID (e.g. part-0, part-1).
# This task writes the part IDs to stdout (so that the `map` task can be expanded to have one task per part).
# And, it writes one "part file" for each of pieces of processing that needs doing, into to single directory
# which is then saved a output artifact.
- name: split
inputs:
parameters:
- name: numParts
script:
image: python:alpine3.6
command:
- python
source: |
import json
import os
import sys
os.mkdir("/tmp/parts")
partIds = list(map(lambda x: "part-" + str(x), range({{inputs.parameters.numParts}})))
for i, partId in enumerate(partIds, start=1):
with open("/tmp/parts/" + partId + ".json", "w") as out:
json.dump({"foo": i}, out)
json.dump(partIds, sys.stdout)
outputs:
artifacts:
- name: parts
path: /tmp/parts
# One `map` per part ID is started. Finds its own "part file" under `/tmp/parts/${partId}`.
# Each `map` task has an output artifact saved with a unique name for the part into to a common "results directory".
- name: map
inputs:
parameters:
- name: partId
- name: numGroups
artifacts:
- name: parts
path: /tmp/parts
script:
image: python:alpine3.6
command:
- python
source: |
import json
import os
import sys
partId = "{{inputs.parameters.partId}}"
numGroups = {{inputs.parameters.numGroups}}
os.mkdir("/tmp/results")
with open("/tmp/parts/" + partId + ".json") as f:
part = json.load(f)
with open("/tmp/results/" + partId + ".json", "w") as out:
json.dump({"bar": part["foo"] * 2, "group": part["foo"] % numGroups}, out)
outputs:
artifacts:
- name: result
path: /tmp/results/{{inputs.parameters.partId}}.json
archive:
none: { }
s3:
bucket: my-bucket
endpoint: minio:9000
insecure: true
accessKeySecret:
name: my-minio-cred
key: accesskey
secretKeySecret:
name: my-minio-cred
key: secretkey
key: "{{workflow.name}}/results/{{inputs.parameters.partId}}.json"
# The `reduce` task takes the "results directory" and returns a single result.
- name: reduce
inputs:
parameters:
- name: group
artifacts:
- name: result
path: /tmp/results
s3:
bucket: my-bucket
endpoint: minio:9000
insecure: true
accessKeySecret:
name: my-minio-cred
key: accesskey
secretKeySecret:
name: my-minio-cred
key: secretkey
key: "{{workflow.name}}/results"
script:
image: python:alpine3.6
command:
- python
source: |
import json
import os
import sys
total = 0
group = "{{inputs.parameters.group}}"
os.mkdir("/tmp/totals/")
for f in list(map(lambda x: open("/tmp/results/" + x), os.listdir("/tmp/results"))):
result = json.load(f)
if result["group"] == group:
total = total + result["bar"]
with open("/tmp/totals/" + group, "w") as f:
f.write(str(total))
f.close()
outputs:
parameters:
- name: total-{{inputs.parameters.group}}
globalName: total-{{inputs.parameters.group}}
valueFrom:
path: /tmp/totals/{{inputs.parameters.group}}