Skip to content

Commit

Permalink
fix: aggregate JSON output parameters correctly
Browse files Browse the repository at this point in the history
Fixes argoproj#13510

`outputs.result` of a `withItems`/`withParams` will be conditionally
`json.Unmarshal`ed.

Other output parameters are always json.Unmarshalled, which leads to
inconsitency and a complete inability to pass JSON out of withItems
outputs straight back in as `withItems`

Conditionally unmarshal the outputs.params
* avoiding it if the parameter isn't enclosed in `{` or `[`, primarily
to avoid bare numbers as they're valid JSON but need quoting here
* falling back to old behaviour if this fails

Additional e2e test tests the example case.

Signed-off-by: Alan Clucas <alan@clucas.org>
  • Loading branch information
Joibel committed Aug 27, 2024
1 parent f8f1893 commit 76c5e3e
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 3 deletions.
69 changes: 69 additions & 0 deletions test/e2e/functional/param-aggregation-fromoutputs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: double-fan-out-using-param-
spec:
entrypoint: main-dag
templates:
- inputs:
parameters:
- name: param1
name: operation
outputs:
parameters:
- name: output1
valueFrom:
path: /tmp/fan_out.json
script:
command:
- python3
image: python:3.11
source: |-
import os
import sys
sys.path.append(os.getcwd())
import json
try: param1 = json.loads(r'''{{inputs.parameters.param1}}''')
except: param1 = r'''{{inputs.parameters.param1}}'''
with open('/tmp/fan_out.json', 'w') as f:
json.dump(param1, f)
print(json.dumps(param1))
- dag:
tasks:
- arguments:
parameters:
- name: param1
value: '{{item}}'
name: task3
template: operation
withParam: '{{inputs.parameters.param1}}'
inputs:
parameters:
- name: param1
name: secondary-dag
- dag:
tasks:
- arguments:
parameters:
- name: param1
value: '[[{"key1": "value1"}, {"key2": "value2"}, {"key3": "value3"}], [{"key4": "value4"}, {"key5": "value5"}]]'
name: task1
template: operation
- arguments:
parameters:
- name: param1
value: '{{item}}'
depends: task1
name: task2
template: operation
withParam: '{{tasks.task1.outputs.parameters.output1}}'
- arguments:
parameters:
- name: param1
value: '{{item}}'
depends: task2
name: task3-dag
template: secondary-dag
withParam: '{{tasks.task2.outputs.parameters.output1}}'
name: main-dag
17 changes: 17 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,23 @@ func (s *FunctionalSuite) TestParameterAggregation() {
})
}

func (s *FunctionalSuite) TestParameterAggregationFromOutputs() {
s.Given().
Workflow("@functional/param-aggregation-fromoutputs.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(time.Second * 90).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
assert.NotNil(t, status.Nodes.FindByDisplayName("task3(0:key1:value1)"))
assert.NotNil(t, status.Nodes.FindByDisplayName("task3(1:key2:value2)"))
assert.NotNil(t, status.Nodes.FindByDisplayName("task3(2:key3:value3)"))
assert.NotNil(t, status.Nodes.FindByDisplayName("task3(0:key4:value4)"))
assert.NotNil(t, status.Nodes.FindByDisplayName("task3(1:key5:value5)"))
})
}

func (s *FunctionalSuite) TestDAGDepends() {
s.Given().
Workflow("@functional/dag-depends.yaml").
Expand Down
35 changes: 32 additions & 3 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3265,9 +3265,38 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(scope *wfScope, prefix st
// Adding per-output aggregated value placeholders
for outputName, valueList := range outputParamValueLists {
key = fmt.Sprintf("%s.outputs.parameters.%s", prefix, outputName)
valueListJSON, err := json.Marshal(valueList)
if err != nil {
return err
unmarshalSuccess := true
var unmarshalledList []interface{}
for _, value := range valueList {
// Only try to unmarshal things that look like json lists or dicts
// and especially avoid unstringified numbers which are valid JSON
valueTrim := strings.Trim(value, " ")
valueTrimLen := len(valueTrim)
if valueTrimLen > 0 &&
!((valueTrim[0] == '{' && valueTrim[valueTrimLen-1] == '}') ||
(valueTrim[0] == '[' && valueTrim[valueTrimLen-1] == ']')) {
unmarshalSuccess = false
break // This isn't a json list or dict, leave it
}
var unmarshalledValue interface{}
err := json.Unmarshal([]byte(value), &unmarshalledValue)
if err != nil {
unmarshalSuccess = false
break // Unmarshal failed, fall back to strings
}
unmarshalledList = append(unmarshalledList, unmarshalledValue)
}
var valueListJSON []byte
if unmarshalSuccess {
valueListJSON, err = json.Marshal(unmarshalledList)
if err != nil {
return err
}
} else {
valueListJSON, err = json.Marshal(valueList)
if err != nil {
return err
}
}
scope.addParamToScope(key, string(valueListJSON))
}
Expand Down

0 comments on commit 76c5e3e

Please sign in to comment.