Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WithParams #2044

Merged
merged 20 commits into from
Sep 17, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
import tarfile
import zipfile
from typing import Callable, Set, List, Text, Dict, Tuple, Any, Union, Optional
from typing import Any, Set, List, Text, Dict

import yaml
from kfp.dsl import _container_op, _for_loop
from kfp.dsl import _for_loop

from .. import dsl
from ._k8s_helper import K8sHelper
Expand Down Expand Up @@ -627,7 +626,10 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None):
for arg in args:
param = {'name': arg.name}
if arg.value is not None:
param['value'] = json.dumps(arg.value)
if isinstance(arg.value, (list, tuple)):
param['value'] = json.dumps(arg.value)
else:
param['value'] = str(arg.value)
input_params.append(param)

# Templates
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/kfp/dsl/_for_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def __getattr__(self, item):
return LoopArgumentVariable(self.name, item)

def to_list_for_task_yaml(self):
if isinstance(self.items_or_pipeline_param, list):
if isinstance(self.items_or_pipeline_param, (list, tuple)):
return self.items_or_pipeline_param
else:
raise ValueError("You should only call this method on loop args which have list items, "
Expand Down
1 change: 0 additions & 1 deletion sdk/python/kfp/dsl/_ops_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ def __init__(self, loop_args: Union[_for_loop.ItemList, _pipeline_param.Pipeline
group_name = 'for-loop-{}'.format(code)
super().__init__(self.TYPE_NAME, name=group_name)

self.items_is_pipeline_param = isinstance(loop_args, _pipeline_param.PipelineParam)
if self.items_is_pipeline_param:
loop_args = _for_loop.LoopArguments.from_pipeline_param(loop_args)
elif not self.items_is_pipeline_param and not isinstance(loop_args, _for_loop.LoopArguments):
Expand Down
3 changes: 1 addition & 2 deletions sdk/python/tests/compiler/compiler_withparams_test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ def pipeline():
name="my-out-cop0",
image='python:alpine3.6',
command=["sh", "-c"],
kevinbache marked this conversation as resolved.
Show resolved Hide resolved
# arguments=['python -c "import json; import sys; json.dump([i for i in range(20, 31)], open(\'/tmp/out.json\', \'w\'))"'],
arguments=['python -c "import json; import sys; json.dump([{\'a\': 1, \'b\': 2}, {\'a\': 10, \'b\': 20}], open(\'/tmp/out.json\', \'w\'))"'],
arguments=['python -c "import json; import sys; json.dump([{\'a\': 1, \'b\': 2}, {\'a\': 10, \'b\': 20}], open(\'/tmp/out.json\', \'w\'))"'],
kevinbache marked this conversation as resolved.
Show resolved Hide resolved
kevinbache marked this conversation as resolved.
Show resolved Hide resolved
file_outputs={'out': '/tmp/out.json'},
)

Expand Down
13 changes: 13 additions & 0 deletions sdk/python/tests/compiler/testdata/withitem_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,16 @@ def pipeline(my_pipe_param=10):
if __name__ == '__main__':
from kfp import compiler
print(compiler.Compiler().compile(pipeline, package_path=None))

import kfp
client = kfp.Client(host='127.0.0.1:8080/pipeline')
kevinbache marked this conversation as resolved.
Show resolved Hide resolved

pkg_path = '/tmp/witest_pkg.tar.gz'
compiler.Compiler().compile(pipeline, package_path=pkg_path)
kevinbache marked this conversation as resolved.
Show resolved Hide resolved
exp = client.create_experiment('withparams_exp')
client.run_pipeline(
experiment_id=exp.id,
job_name='withitem_basic',
pipeline_package_path=pkg_path,
params={},
)
4 changes: 2 additions & 2 deletions sdk/python/tests/compiler/testdata/withitem_basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default":
10, "name": "my_pipe_param"}], "name": "my-pipeline"}'
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": 10, "name": "my_pipe_param"}],
"name": "my-pipeline"}'
generateName: my-pipeline-
spec:
arguments:
Expand Down
18 changes: 18 additions & 0 deletions sdk/python/tests/compiler/testdata/withitem_nested.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,21 @@ def pipeline(my_pipe_param=10):
command=["sh", "-c"],
arguments=["echo %s" % my_pipe_param],
)


if __name__ == '__main__':
from kfp import compiler
import kfp
import time
client = kfp.Client(host='127.0.0.1:8080/pipeline')
print(compiler.Compiler().compile(pipeline, package_path=None))

pkg_path = '/tmp/witest_pkg.tar.gz'
compiler.Compiler().compile(pipeline, package_path=pkg_path)
exp = client.create_experiment('withparams_exp')
client.run_pipeline(
experiment_id=exp.id,
job_name='withitem_nested_{}'.format(time.time()),
pipeline_package_path=pkg_path,
params={},
)
4 changes: 2 additions & 2 deletions sdk/python/tests/compiler/testdata/withitem_nested.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default":
10, "name": "my_pipe_param"}], "name": "my-pipeline"}'
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": 10, "name": "my_pipe_param"}],
"name": "my-pipeline"}'
generateName: my-pipeline-
spec:
arguments:
Expand Down
13 changes: 13 additions & 0 deletions sdk/python/tests/compiler/testdata/withparam_global.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,17 @@ def pipeline(loopidy_doop=[3, 5, 7, 9]):

if __name__ == '__main__':
from kfp import compiler
import kfp
import time
client = kfp.Client(host='127.0.0.1:8080/pipeline')
print(compiler.Compiler().compile(pipeline, package_path=None))

pkg_path = '/tmp/witest_pkg.tar.gz'
compiler.Compiler().compile(pipeline, package_path=pkg_path)
exp = client.create_experiment('withparams_exp')
client.run_pipeline(
experiment_id=exp.id,
job_name='withparam_global_{}'.format(time.time()),
pipeline_package_path=pkg_path,
params={},
)
23 changes: 0 additions & 23 deletions sdk/python/tests/compiler/testdata/withparam_global.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,6 @@ spec:
parameters:
- name: loopidy-doop
name: my-in-cop1
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- container:
args:
- python -c "import json; import sys; json.dump([i for i in range(20, 31)],
Expand All @@ -54,13 +46,6 @@ spec:
image: python:alpine3.6
name: my-out-cop0
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
parameters:
- name: my-out-cop0-out
valueFrom:
Expand All @@ -76,14 +61,6 @@ spec:
parameters:
- name: my-out-cop0-out
name: my-out-cop2
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- dag:
tasks:
- arguments:
Expand Down
15 changes: 14 additions & 1 deletion sdk/python/tests/compiler/testdata/withparam_global_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def pipeline(loopidy_doop=[{'a': 1, 'b': 2}, {'a': 10, 'b': 20}]):
name="my-in-cop1",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo no output global op1, item: %s" % item],
arguments=["echo no output global op1, item.a: %s" % item.a],
).after(op0)

op_out = dsl.ContainerOp(
Expand All @@ -57,4 +57,17 @@ def pipeline(loopidy_doop=[{'a': 1, 'b': 2}, {'a': 10, 'b': 20}]):

if __name__ == '__main__':
from kfp import compiler
import kfp
import time
client = kfp.Client(host='127.0.0.1:8080/pipeline')
print(compiler.Compiler().compile(pipeline, package_path=None))

pkg_path = '/tmp/witest_pkg.tar.gz'
compiler.Compiler().compile(pipeline, package_path=pkg_path)
exp = client.create_experiment('withparams_exp')
client.run_pipeline(
experiment_id=exp.id,
job_name='withparam_global_dict_{}'.format(time.time()),
pipeline_package_path=pkg_path,
params={},
)
39 changes: 8 additions & 31 deletions sdk/python/tests/compiler/testdata/withparam_global_dict.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,25 @@ spec:
tasks:
- arguments:
parameters:
- name: loopidy-doop
value: '{{inputs.parameters.loopidy-doop}}'
- name: loopidy-doop-subvar-a
value: '{{inputs.parameters.loopidy-doop-subvar-a}}'
name: my-in-cop1
template: my-in-cop1
inputs:
parameters:
- name: loopidy-doop
- name: loopidy-doop-subvar-a
name: for-loop-for-loop-00000001-1
- container:
args:
- 'echo no output global op1, item: {{inputs.parameters.loopidy-doop}}'
- 'echo no output global op1, item.a: {{inputs.parameters.loopidy-doop-subvar-a}}'
command:
- sh
- -c
image: library/bash:4.4.23
inputs:
parameters:
- name: loopidy-doop
- name: loopidy-doop-subvar-a
name: my-in-cop1
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- container:
args:
- python -c "import json; import sys; json.dump([i for i in range(20, 31)],
Expand All @@ -54,13 +46,6 @@ spec:
image: python:alpine3.6
name: my-out-cop0
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
parameters:
- name: my-out-cop0-out
valueFrom:
Expand All @@ -76,20 +61,12 @@ spec:
parameters:
- name: my-out-cop0-out
name: my-out-cop2
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- dag:
tasks:
- arguments:
parameters:
- name: loopidy-doop
value: '{{item}}'
- name: loopidy-doop-subvar-a
value: '{{item.a}}'
dependencies:
- my-out-cop0
name: for-loop-for-loop-00000001-1
Expand All @@ -105,4 +82,4 @@ spec:
- my-out-cop0
name: my-out-cop2
template: my-out-cop2
name: my-pipeline
name: my-pipeline
13 changes: 13 additions & 0 deletions sdk/python/tests/compiler/testdata/withparam_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,17 @@ def pipeline():

if __name__ == '__main__':
from kfp import compiler
import kfp
import time
client = kfp.Client(host='127.0.0.1:8080/pipeline')
print(compiler.Compiler().compile(pipeline, package_path=None))

pkg_path = '/tmp/witest_pkg.tar.gz'
compiler.Compiler().compile(pipeline, package_path=pkg_path)
exp = client.create_experiment('withparams_exp')
client.run_pipeline(
experiment_id=exp.id,
job_name='withparam_output_{}'.format(time.time()),
pipeline_package_path=pkg_path,
params={},
)
23 changes: 0 additions & 23 deletions sdk/python/tests/compiler/testdata/withparam_output.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ spec:
parameters:
- name: my-out-cop0-out
name: my-in-cop1
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- container:
args:
- python -c "import json; import sys; json.dump([i for i in range(20, 31)],
Expand All @@ -51,13 +43,6 @@ spec:
image: python:alpine3.6
name: my-out-cop0
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
parameters:
- name: my-out-cop0-out
valueFrom:
Expand All @@ -73,14 +58,6 @@ spec:
parameters:
- name: my-out-cop0-out
name: my-out-cop2
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- dag:
tasks:
- arguments:
Expand Down
15 changes: 14 additions & 1 deletion sdk/python/tests/compiler/testdata/withparam_output_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def pipeline():
name="my-in-cop1",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo do output op1 item: %s" % item],
arguments=["echo do output op1 item.a: %s" % item.a],
)

op_out = dsl.ContainerOp(
Expand All @@ -56,4 +56,17 @@ def pipeline():

if __name__ == '__main__':
from kfp import compiler
import kfp
import time
client = kfp.Client(host='127.0.0.1:8080/pipeline')
print(compiler.Compiler().compile(pipeline, package_path=None))

pkg_path = '/tmp/witest_pkg.tar.gz'
compiler.Compiler().compile(pipeline, package_path=pkg_path)
exp = client.create_experiment('withparams_exp')
client.run_pipeline(
experiment_id=exp.id,
job_name='withparam_output_dict_{}'.format(time.time()),
pipeline_package_path=pkg_path,
params={},
)
Loading