Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DSL refactor #619

Merged
merged 14 commits into from
Jan 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/python/kfp/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from datetime import datetime

from .compiler import compiler
from .compiler import _k8s_helper


class Client(object):
Expand Down Expand Up @@ -171,7 +172,7 @@ def run_pipeline(self, experiment_id, job_name, pipeline_package_path, params={}

pipeline_obj = self._extract_pipeline_yaml(pipeline_package_path)
pipeline_json_string = json.dumps(pipeline_obj)
api_params = [kfp_run.ApiParameter(name=compiler.Compiler()._sanitize_name(k), value=str(v))
api_params = [kfp_run.ApiParameter(name=_k8s_helper.K8sHelper.sanitize_k8s_name(k), value=str(v))
for k,v in params.items()]
key = kfp_run.models.ApiResourceKey(id=experiment_id,
type=kfp_run.models.ApiResourceType.EXPERIMENT)
Expand Down
9 changes: 8 additions & 1 deletion sdk/python/kfp/compiler/_k8s_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from kubernetes import config
import time
import logging

import re

class K8sHelper(object):
""" Kubernetes Helper """
Expand Down Expand Up @@ -119,6 +119,13 @@ def run_job(self, yaml_spec, timeout=600):
self._delete_k8s_job(pod_name, yaml_spec)
return succ

@staticmethod
def sanitize_k8s_name(name):
"""From _make_kubernetes_name
sanitize_k8s_name cleans and converts the names in the workflow.
"""
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-')

@staticmethod
def convert_k8s_obj_to_json(k8s_obj):
"""
Expand Down
274 changes: 151 additions & 123 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,132 +38,16 @@ def my_pipeline(a: dsl.PipelineParam, b: dsl.PipelineParam):
```
"""

def _sanitize_name(self, name):
"""From _make_kubernetes_name
_sanitize_name cleans and converts the names in the workflow.
"""
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-')

def _pipelineparam_full_name(self, param):
"""_pipelineparam_full_name
"""_pipelineparam_full_name converts the names of pipeline parameters
to unique names in the argo yaml

Args:
param(PipelineParam): pipeline parameter
"""
if param.op_name:
return param.op_name + '-' + param.name
return self._sanitize_name(param.name)

def _build_conventional_artifact(self, name):
return {
'name': name,
'path': '/' + name + '.json',
's3': {
# TODO: parameterize namespace for minio service
'endpoint': 'minio-service.kubeflow:9000',
'bucket': 'mlpipeline',
'key': 'runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz',
'insecure': True,
'accessKeySecret': {
'name': 'mlpipeline-minio-artifact',
'key': 'accesskey',
},
'secretKeySecret': {
'name': 'mlpipeline-minio-artifact',
'key': 'secretkey'
}
},
}

def _process_args(self, raw_args, argument_inputs):
if not raw_args:
return []

processed_args = list(map(str, raw_args))
for i, _ in enumerate(processed_args):
for param in argument_inputs:
full_name = self._pipelineparam_full_name(param)
processed_args[i] = re.sub(
str(param), '{{inputs.parameters.%s}}' % full_name, str(processed_args[i]))

return processed_args

def _op_to_template(self, op):
"""Generate template given an operator inherited from dsl.ContainerOp."""

input_parameters = []
for param in op.inputs:
one_parameter = {'name': self._pipelineparam_full_name(param)}
if param.value:
one_parameter['value'] = str(param.value)
input_parameters.append(one_parameter)
# Sort to make the results deterministic.
input_parameters.sort(key=lambda x: x['name'])

output_parameters = []
for param in op.outputs.values():
output_parameters.append({
'name': self._pipelineparam_full_name(param),
'valueFrom': {'path': op.file_outputs[param.name]}
})
output_parameters.sort(key=lambda x: x['name'])

template = {
'name': op.name,
'container': {
'image': op.image,
}
}
processed_arguments = self._process_args(op.arguments, op.argument_inputs)
processed_command = self._process_args(op.command, op.argument_inputs)
if processed_arguments:
template['container']['args'] = processed_arguments
if processed_command:
template['container']['command'] = processed_command
if input_parameters:
template['inputs'] = {'parameters': input_parameters}

template['outputs'] = {}
if output_parameters:
template['outputs'] = {'parameters': output_parameters}

# Generate artifact for metadata output
# The motivation of appending the minio info in the yaml
# is to specify a unique path for the metadata.
# TODO: after argo addresses the issue that configures a unique path
# for the artifact output when default artifact repository is configured,
# this part needs to be updated to use the default artifact repository.
output_artifacts = []
output_artifacts.append(self._build_conventional_artifact('mlpipeline-ui-metadata'))
output_artifacts.append(self._build_conventional_artifact('mlpipeline-metrics'))
template['outputs']['artifacts'] = output_artifacts


# Set resources.
if op.resource_limits or op.resource_requests:
template['container']['resources'] = {}
if op.resource_limits:
template['container']['resources']['limits'] = op.resource_limits
if op.resource_requests:
template['container']['resources']['requests'] = op.resource_requests

# Set nodeSelector.
if op.node_selector:
template['nodeSelector'] = op.node_selector

if op.env_variables:
template['container']['env'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.env_variables))
if op.volume_mounts:
template['container']['volumeMounts'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.volume_mounts))

if op.pod_annotations or op.pod_labels:
template['metadata'] = {}
if op.pod_annotations:
template['metadata']['annotations'] = op.pod_annotations
if op.pod_labels:
template['metadata']['labels'] = op.pod_labels

return template
return param.name

def _get_groups_for_ops(self, root_group):
"""Helper function to get belonging groups for each op.
Expand Down Expand Up @@ -230,7 +114,7 @@ def _get_inputs_outputs(self, pipeline, root_group, op_groups):
# op's inputs and all params used in conditions for that op are both considered.
for param in op.inputs + list(condition_params[op.name]):
# if the value is already provided (immediate value), then no need to expose
# it as input for its parent groups.
# it as input for its parent groups.
if param.value:
continue

Expand Down Expand Up @@ -327,6 +211,126 @@ def _resolve_value_or_reference(self, value_or_reference, potential_references):
else:
return str(value_or_reference)

def _process_args(self, raw_args, argument_inputs):
if not raw_args:
return []
processed_args = list(map(str, raw_args))
for i, _ in enumerate(processed_args):
# unsanitized_argument_inputs stores a dict: string of sanitized param -> string of unsanitized param
matches = []
match = re.findall(r'{{pipelineparam:op=([\w\s\_-]*);name=([\w\s\_-]+);value=(.*?)}}', str(processed_args[i]))
matches += match
unsanitized_argument_inputs = {}
for x in list(set(matches)):
sanitized_str = str(dsl.PipelineParam(K8sHelper.sanitize_k8s_name(x[1]), K8sHelper.sanitize_k8s_name(x[0]), x[2]))
unsanitized_argument_inputs[sanitized_str] = str(dsl.PipelineParam(x[1], x[0], x[2]))

if argument_inputs:
for param in argument_inputs:
if str(param) in unsanitized_argument_inputs:
full_name = self._pipelineparam_full_name(param)
processed_args[i] = re.sub(unsanitized_argument_inputs[str(param)], '{{inputs.parameters.%s}}' % full_name,
processed_args[i])
return processed_args

def _op_to_template(self, op):
"""Generate template given an operator inherited from dsl.ContainerOp."""

def _build_conventional_artifact(name):
return {
'name': name,
'path': '/' + name + '.json',
's3': {
# TODO: parameterize namespace for minio service
'endpoint': 'minio-service.kubeflow:9000',
'bucket': 'mlpipeline',
'key': 'runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz',
'insecure': True,
'accessKeySecret': {
'name': 'mlpipeline-minio-artifact',
'key': 'accesskey',
},
'secretKeySecret': {
'name': 'mlpipeline-minio-artifact',
'key': 'secretkey'
}
},
}

processed_arguments = self._process_args(op.arguments, op.argument_inputs)
processed_command = self._process_args(op.command, op.argument_inputs)

input_parameters = []
for param in op.inputs:
one_parameter = {'name': self._pipelineparam_full_name(param)}
if param.value:
one_parameter['value'] = str(param.value)
input_parameters.append(one_parameter)
# Sort to make the results deterministic.
input_parameters.sort(key=lambda x: x['name'])

output_parameters = []
for param in op.outputs.values():
output_parameters.append({
'name': self._pipelineparam_full_name(param),
'valueFrom': {'path': op.file_outputs[param.name]}
})
output_parameters.sort(key=lambda x: x['name'])

template = {
'name': op.name,
'container': {
'image': op.image,
}
}
if processed_arguments:
template['container']['args'] = processed_arguments
if processed_command:
template['container']['command'] = processed_command
if input_parameters:
template['inputs'] = {'parameters': input_parameters}

template['outputs'] = {}
if output_parameters:
template['outputs'] = {'parameters': output_parameters}

# Generate artifact for metadata output
# The motivation of appending the minio info in the yaml
# is to specify a unique path for the metadata.
# TODO: after argo addresses the issue that configures a unique path
# for the artifact output when default artifact repository is configured,
# this part needs to be updated to use the default artifact repository.
output_artifacts = []
output_artifacts.append(_build_conventional_artifact('mlpipeline-ui-metadata'))
output_artifacts.append(_build_conventional_artifact('mlpipeline-metrics'))
template['outputs']['artifacts'] = output_artifacts

# Set resources.
if op.resource_limits or op.resource_requests:
template['container']['resources'] = {}
if op.resource_limits:
template['container']['resources']['limits'] = op.resource_limits
if op.resource_requests:
template['container']['resources']['requests'] = op.resource_requests

# Set nodeSelector.
if op.node_selector:
template['nodeSelector'] = op.node_selector

if op.env_variables:
template['container']['env'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.env_variables))
if op.volume_mounts:
template['container']['volumeMounts'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.volume_mounts))

if op.pod_annotations or op.pod_labels:
template['metadata'] = {}
if op.pod_annotations:
template['metadata']['annotations'] = op.pod_annotations
if op.pod_labels:
template['metadata']['labels'] = op.pod_labels

return template

def _group_to_template(self, group, inputs, outputs, dependencies):
"""Generate template given an OpsGroup.

Expand Down Expand Up @@ -505,10 +509,10 @@ def _compile(self, pipeline_func):
raise ValueError('Please use a function with @dsl.pipeline decorator.')

pipeline_name, _ = dsl.Pipeline.get_pipeline_functions()[pipeline_func]
pipeline_name = self._sanitize_name(pipeline_name)
pipeline_name = K8sHelper.sanitize_k8s_name(pipeline_name)

# Create the arg list with no default values and call pipeline function.
args_list = [dsl.PipelineParam(self._sanitize_name(arg_name))
args_list = [dsl.PipelineParam(K8sHelper.sanitize_k8s_name(arg_name))
for arg_name in argspec.args]
with dsl.Pipeline(pipeline_name) as p:
pipeline_func(*args_list)
Expand All @@ -517,12 +521,36 @@ def _compile(self, pipeline_func):
self._validate_exit_handler(p)

# Fill in the default values.
args_list_with_defaults = [dsl.PipelineParam(self._sanitize_name(arg_name))
args_list_with_defaults = [dsl.PipelineParam(K8sHelper.sanitize_k8s_name(arg_name))
for arg_name in argspec.args]
if argspec.defaults:
for arg, default in zip(reversed(args_list_with_defaults), reversed(argspec.defaults)):
arg.value = default.value if isinstance(default, dsl.PipelineParam) else default

# Sanitize operator names and param names
sanitized_ops = {}
for op in p.ops.values():
sanitized_name = K8sHelper.sanitize_k8s_name(op.name)
Copy link
Contributor

@Ark-kun Ark-kun Jan 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK op.name is already sanitized and made unique when it's being added to the pipeline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. You've removed that sanitization.
I'm not sure this is a good move. It's easier to fix the name before all the output references have been generated and passed around.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation was to move the k8s related functions from DSL to compilers such that the implementations of another compiler in the future will not depend on K8s. The PR aims to moves the sanitization to the compiler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation was to move the k8s related functions from DSL to compilers such that the implementations of another compiler in the future will not depend on K8s. The PR aims to moves the sanitization to the compiler.

I agree with both those ideas. What's debatable is whether the Pipeline class belongs to the compiler or DSL.

op.name = sanitized_name
for param in op.inputs + op.argument_inputs:
param.name = K8sHelper.sanitize_k8s_name(param.name)
if param.op_name:
param.op_name = K8sHelper.sanitize_k8s_name(param.op_name)
for param in op.outputs.values():
param.name = K8sHelper.sanitize_k8s_name(param.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this will work at this stage? The argument placeholders are probably already embedded into op.command and op.args as strings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The op.args are handled in the _op_to_template function in this PR.
However, I have not handled the op.command. Do we already support parameterized commands?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We did. See 875efea

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update this PR for the command as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if param.op_name:
param.op_name = K8sHelper.sanitize_k8s_name(param.op_name)
if op.output is not None:
op.output.name = K8sHelper.sanitize_k8s_name(op.output.name)
op.output.op_name = K8sHelper.sanitize_k8s_name(op.output.op_name)
if op.file_outputs is not None:
sanitized_file_outputs = {}
for key in op.file_outputs.keys():
sanitized_file_outputs[K8sHelper.sanitize_k8s_name(key)] = op.file_outputs[key]
op.file_outputs = sanitized_file_outputs
sanitized_ops[sanitized_name] = op
p.ops = sanitized_ops

workflow = self._create_pipeline_workflow(args_list_with_defaults, p)
return workflow

Expand Down
9 changes: 7 additions & 2 deletions sdk/python/kfp/dsl/_container_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None
"""Create a new instance of ContainerOp.

Args:
name: the name of the op. Has to be unique within a pipeline.
name: the name of the op. It does not have to be unique within a pipeline
because the pipeline will generates a unique new name in case of conflicts.
image: the container image name, such as 'python:3.5-jessie'
command: the command to run in the container.
If None, uses default CMD in defined in container.
Expand All @@ -43,6 +44,10 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None
if not _pipeline.Pipeline.get_default_pipeline():
raise ValueError('Default pipeline not defined.')

valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$'
if not re.match(valid_name_regex, name):
raise ValueError('Only letters, numbers, spaces, "_", and "-" are allowed in name. Must begin with letter: %s' % (name))

self.human_name = name
self.name = _pipeline.Pipeline.get_default_pipeline().add_op(self, is_exit_handler)
self.image = image
Expand All @@ -60,7 +65,7 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None

matches = []
for arg in (command or []) + (arguments or []):
match = re.findall(r'{{pipelineparam:op=([\w-]*);name=([\w-]+);value=(.*?)}}', str(arg))
match = re.findall(r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+);value=(.*?)}}', str(arg))
matches += match

self.argument_inputs = [_pipeline_param.PipelineParam(x[1], x[0], x[2])
Expand Down
Loading