-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
Copy pathmap-reduce.yaml
133 lines (133 loc) · 4.25 KB
/
map-reduce.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: map-reduce-
annotations:
workflows.argoproj.io/description: |
This workflow demonstrates map-reduce using "key-only" artifacts.
The first task "split" produces a number of parts, each in the form of a JSON document, saving it to a bucket.
Each "map" task then reads those documents, performs a map operation, and writes them out to a new bucket.
Finally, "reduce" merges all the mapped documents into a final document.
workflows.argoproj.io/version: '>= 3.0.0'
spec:
entrypoint: main
arguments:
parameters:
- name: numParts
value: "4"
templates:
- name: main
dag:
tasks:
- name: split
template: split
arguments:
parameters:
- name: numParts
value: "{{workflow.parameters.numParts}}"
- name: map
template: map
arguments:
parameters:
- name: partId
value: '{{item}}'
artifacts:
- name: part
s3:
key: "{{workflow.name}}/parts/{{item}}.json"
depends: "split"
withParam: '{{tasks.split.outputs.result}}'
- name: reduce
template: reduce
depends: "map"
# The `split` task creates a number of "parts". Each part has a unique ID (index).
# This task writes one "part file" for each of pieces of processing that needs doing, into to single directory
# which is then saved as an output artifact.
# Finally, it dumps a list of part ID to stdout.
- name: split
inputs:
parameters:
- name: numParts
script:
image: python:alpine3.6
command:
- python
source: |
import json
import os
import sys
os.mkdir("/mnt/out")
partIds = list(map(lambda x: str(x), range({{inputs.parameters.numParts}})))
for i, partId in enumerate(partIds, start=1):
with open("/mnt/out/" + partId + ".json", "w") as f:
json.dump({"foo": i}, f)
json.dump(partIds, sys.stdout)
outputs:
artifacts:
- name: parts
path: /mnt/out
archive:
none: { }
s3:
key: "{{workflow.name}}/parts/"
# One `map` per part ID is started. Finds its own "part file" under `/mnt/in/part.json`.
# Each `map` task has an output artifact saved with a unique name for the part into to a common "results directory".
- name: map
inputs:
parameters:
- name: partId
artifacts:
- name: part
path: /mnt/in/part.json
script:
image: python:alpine3.6
command:
- python
source: |
import json
import os
import sys
os.mkdir("/mnt/out")
with open("/mnt/in/part.json") as f:
part = json.load(f)
with open("/mnt/out/part.json", "w") as f:
json.dump({"bar": part["foo"] * 2}, f)
outputs:
artifacts:
- name: part
path: /mnt/out/part.json
archive:
none: { }
s3:
key: "{{workflow.name}}/results/{{inputs.parameters.partId}}.json"
# The `reduce` task takes the "results directory" and returns a single result.
- name: reduce
inputs:
artifacts:
- name: results
path: /mnt/in
s3:
key: "{{workflow.name}}/results"
script:
image: python:alpine3.6
command:
- python
source: |
import json
import os
import sys
total = 0
os.mkdir("/mnt/out")
for f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))):
result = json.load(f)
total = total + result["bar"]
with open("/mnt/out/total.json" , "w") as f:
json.dump({"total": total}, f)
outputs:
artifacts:
- name: total
path: /mnt/out/total.json
archive:
none: { }
s3:
key: "{{workflow.name}}/total.json"