diff --git a/sdk/python/kfp/_client.py b/sdk/python/kfp/_client.py index 37dd10fb47c..a1111a29d3f 100644 --- a/sdk/python/kfp/_client.py +++ b/sdk/python/kfp/_client.py @@ -22,6 +22,7 @@ from datetime import datetime from .compiler import compiler +from .compiler import _k8s_helper class Client(object): @@ -171,7 +172,7 @@ def run_pipeline(self, experiment_id, job_name, pipeline_package_path, params={} pipeline_obj = self._extract_pipeline_yaml(pipeline_package_path) pipeline_json_string = json.dumps(pipeline_obj) - api_params = [kfp_run.ApiParameter(name=compiler.Compiler()._sanitize_name(k), value=str(v)) + api_params = [kfp_run.ApiParameter(name=_k8s_helper.K8sHelper.sanitize_k8s_name(k), value=str(v)) for k,v in params.items()] key = kfp_run.models.ApiResourceKey(id=experiment_id, type=kfp_run.models.ApiResourceType.EXPERIMENT) diff --git a/sdk/python/kfp/compiler/_k8s_helper.py b/sdk/python/kfp/compiler/_k8s_helper.py index 000ebb55703..bc54c307b0c 100644 --- a/sdk/python/kfp/compiler/_k8s_helper.py +++ b/sdk/python/kfp/compiler/_k8s_helper.py @@ -17,7 +17,7 @@ from kubernetes import config import time import logging - +import re class K8sHelper(object): """ Kubernetes Helper """ @@ -119,6 +119,13 @@ def run_job(self, yaml_spec, timeout=600): self._delete_k8s_job(pod_name, yaml_spec) return succ + @staticmethod + def sanitize_k8s_name(name): + """From _make_kubernetes_name + sanitize_k8s_name cleans and converts the names in the workflow. + """ + return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-') + @staticmethod def convert_k8s_obj_to_json(k8s_obj): """ diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 08de44511ae..7c0dface55b 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -38,132 +38,16 @@ def my_pipeline(a: dsl.PipelineParam, b: dsl.PipelineParam): ``` """ - def _sanitize_name(self, name): - """From _make_kubernetes_name - _sanitize_name cleans and converts the names in the workflow. - """ - return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-') - def _pipelineparam_full_name(self, param): - """_pipelineparam_full_name + """_pipelineparam_full_name converts the names of pipeline parameters + to unique names in the argo yaml Args: param(PipelineParam): pipeline parameter """ if param.op_name: return param.op_name + '-' + param.name - return self._sanitize_name(param.name) - - def _build_conventional_artifact(self, name): - return { - 'name': name, - 'path': '/' + name + '.json', - 's3': { - # TODO: parameterize namespace for minio service - 'endpoint': 'minio-service.kubeflow:9000', - 'bucket': 'mlpipeline', - 'key': 'runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz', - 'insecure': True, - 'accessKeySecret': { - 'name': 'mlpipeline-minio-artifact', - 'key': 'accesskey', - }, - 'secretKeySecret': { - 'name': 'mlpipeline-minio-artifact', - 'key': 'secretkey' - } - }, - } - - def _process_args(self, raw_args, argument_inputs): - if not raw_args: - return [] - - processed_args = list(map(str, raw_args)) - for i, _ in enumerate(processed_args): - for param in argument_inputs: - full_name = self._pipelineparam_full_name(param) - processed_args[i] = re.sub( - str(param), '{{inputs.parameters.%s}}' % full_name, str(processed_args[i])) - - return processed_args - - def _op_to_template(self, op): - """Generate template given an operator inherited from dsl.ContainerOp.""" - - input_parameters = [] - for param in op.inputs: - one_parameter = {'name': self._pipelineparam_full_name(param)} - if param.value: - one_parameter['value'] = str(param.value) - input_parameters.append(one_parameter) - # Sort to make the results deterministic. - input_parameters.sort(key=lambda x: x['name']) - - output_parameters = [] - for param in op.outputs.values(): - output_parameters.append({ - 'name': self._pipelineparam_full_name(param), - 'valueFrom': {'path': op.file_outputs[param.name]} - }) - output_parameters.sort(key=lambda x: x['name']) - - template = { - 'name': op.name, - 'container': { - 'image': op.image, - } - } - processed_arguments = self._process_args(op.arguments, op.argument_inputs) - processed_command = self._process_args(op.command, op.argument_inputs) - if processed_arguments: - template['container']['args'] = processed_arguments - if processed_command: - template['container']['command'] = processed_command - if input_parameters: - template['inputs'] = {'parameters': input_parameters} - - template['outputs'] = {} - if output_parameters: - template['outputs'] = {'parameters': output_parameters} - - # Generate artifact for metadata output - # The motivation of appending the minio info in the yaml - # is to specify a unique path for the metadata. - # TODO: after argo addresses the issue that configures a unique path - # for the artifact output when default artifact repository is configured, - # this part needs to be updated to use the default artifact repository. - output_artifacts = [] - output_artifacts.append(self._build_conventional_artifact('mlpipeline-ui-metadata')) - output_artifacts.append(self._build_conventional_artifact('mlpipeline-metrics')) - template['outputs']['artifacts'] = output_artifacts - - - # Set resources. - if op.resource_limits or op.resource_requests: - template['container']['resources'] = {} - if op.resource_limits: - template['container']['resources']['limits'] = op.resource_limits - if op.resource_requests: - template['container']['resources']['requests'] = op.resource_requests - - # Set nodeSelector. - if op.node_selector: - template['nodeSelector'] = op.node_selector - - if op.env_variables: - template['container']['env'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.env_variables)) - if op.volume_mounts: - template['container']['volumeMounts'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.volume_mounts)) - - if op.pod_annotations or op.pod_labels: - template['metadata'] = {} - if op.pod_annotations: - template['metadata']['annotations'] = op.pod_annotations - if op.pod_labels: - template['metadata']['labels'] = op.pod_labels - - return template + return param.name def _get_groups_for_ops(self, root_group): """Helper function to get belonging groups for each op. @@ -230,7 +114,7 @@ def _get_inputs_outputs(self, pipeline, root_group, op_groups): # op's inputs and all params used in conditions for that op are both considered. for param in op.inputs + list(condition_params[op.name]): # if the value is already provided (immediate value), then no need to expose - # it as input for its parent groups. + # it as input for its parent groups. if param.value: continue @@ -327,6 +211,126 @@ def _resolve_value_or_reference(self, value_or_reference, potential_references): else: return str(value_or_reference) + def _process_args(self, raw_args, argument_inputs): + if not raw_args: + return [] + processed_args = list(map(str, raw_args)) + for i, _ in enumerate(processed_args): + # unsanitized_argument_inputs stores a dict: string of sanitized param -> string of unsanitized param + matches = [] + match = re.findall(r'{{pipelineparam:op=([\w\s\_-]*);name=([\w\s\_-]+);value=(.*?)}}', str(processed_args[i])) + matches += match + unsanitized_argument_inputs = {} + for x in list(set(matches)): + sanitized_str = str(dsl.PipelineParam(K8sHelper.sanitize_k8s_name(x[1]), K8sHelper.sanitize_k8s_name(x[0]), x[2])) + unsanitized_argument_inputs[sanitized_str] = str(dsl.PipelineParam(x[1], x[0], x[2])) + + if argument_inputs: + for param in argument_inputs: + if str(param) in unsanitized_argument_inputs: + full_name = self._pipelineparam_full_name(param) + processed_args[i] = re.sub(unsanitized_argument_inputs[str(param)], '{{inputs.parameters.%s}}' % full_name, + processed_args[i]) + return processed_args + + def _op_to_template(self, op): + """Generate template given an operator inherited from dsl.ContainerOp.""" + + def _build_conventional_artifact(name): + return { + 'name': name, + 'path': '/' + name + '.json', + 's3': { + # TODO: parameterize namespace for minio service + 'endpoint': 'minio-service.kubeflow:9000', + 'bucket': 'mlpipeline', + 'key': 'runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz', + 'insecure': True, + 'accessKeySecret': { + 'name': 'mlpipeline-minio-artifact', + 'key': 'accesskey', + }, + 'secretKeySecret': { + 'name': 'mlpipeline-minio-artifact', + 'key': 'secretkey' + } + }, + } + + processed_arguments = self._process_args(op.arguments, op.argument_inputs) + processed_command = self._process_args(op.command, op.argument_inputs) + + input_parameters = [] + for param in op.inputs: + one_parameter = {'name': self._pipelineparam_full_name(param)} + if param.value: + one_parameter['value'] = str(param.value) + input_parameters.append(one_parameter) + # Sort to make the results deterministic. + input_parameters.sort(key=lambda x: x['name']) + + output_parameters = [] + for param in op.outputs.values(): + output_parameters.append({ + 'name': self._pipelineparam_full_name(param), + 'valueFrom': {'path': op.file_outputs[param.name]} + }) + output_parameters.sort(key=lambda x: x['name']) + + template = { + 'name': op.name, + 'container': { + 'image': op.image, + } + } + if processed_arguments: + template['container']['args'] = processed_arguments + if processed_command: + template['container']['command'] = processed_command + if input_parameters: + template['inputs'] = {'parameters': input_parameters} + + template['outputs'] = {} + if output_parameters: + template['outputs'] = {'parameters': output_parameters} + + # Generate artifact for metadata output + # The motivation of appending the minio info in the yaml + # is to specify a unique path for the metadata. + # TODO: after argo addresses the issue that configures a unique path + # for the artifact output when default artifact repository is configured, + # this part needs to be updated to use the default artifact repository. + output_artifacts = [] + output_artifacts.append(_build_conventional_artifact('mlpipeline-ui-metadata')) + output_artifacts.append(_build_conventional_artifact('mlpipeline-metrics')) + template['outputs']['artifacts'] = output_artifacts + + # Set resources. + if op.resource_limits or op.resource_requests: + template['container']['resources'] = {} + if op.resource_limits: + template['container']['resources']['limits'] = op.resource_limits + if op.resource_requests: + template['container']['resources']['requests'] = op.resource_requests + + # Set nodeSelector. + if op.node_selector: + template['nodeSelector'] = op.node_selector + + if op.env_variables: + template['container']['env'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.env_variables)) + if op.volume_mounts: + template['container']['volumeMounts'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.volume_mounts)) + + if op.pod_annotations or op.pod_labels: + template['metadata'] = {} + if op.pod_annotations: + template['metadata']['annotations'] = op.pod_annotations + if op.pod_labels: + template['metadata']['labels'] = op.pod_labels + + return template + def _group_to_template(self, group, inputs, outputs, dependencies): """Generate template given an OpsGroup. @@ -505,10 +509,10 @@ def _compile(self, pipeline_func): raise ValueError('Please use a function with @dsl.pipeline decorator.') pipeline_name, _ = dsl.Pipeline.get_pipeline_functions()[pipeline_func] - pipeline_name = self._sanitize_name(pipeline_name) + pipeline_name = K8sHelper.sanitize_k8s_name(pipeline_name) # Create the arg list with no default values and call pipeline function. - args_list = [dsl.PipelineParam(self._sanitize_name(arg_name)) + args_list = [dsl.PipelineParam(K8sHelper.sanitize_k8s_name(arg_name)) for arg_name in argspec.args] with dsl.Pipeline(pipeline_name) as p: pipeline_func(*args_list) @@ -517,12 +521,36 @@ def _compile(self, pipeline_func): self._validate_exit_handler(p) # Fill in the default values. - args_list_with_defaults = [dsl.PipelineParam(self._sanitize_name(arg_name)) + args_list_with_defaults = [dsl.PipelineParam(K8sHelper.sanitize_k8s_name(arg_name)) for arg_name in argspec.args] if argspec.defaults: for arg, default in zip(reversed(args_list_with_defaults), reversed(argspec.defaults)): arg.value = default.value if isinstance(default, dsl.PipelineParam) else default + # Sanitize operator names and param names + sanitized_ops = {} + for op in p.ops.values(): + sanitized_name = K8sHelper.sanitize_k8s_name(op.name) + op.name = sanitized_name + for param in op.inputs + op.argument_inputs: + param.name = K8sHelper.sanitize_k8s_name(param.name) + if param.op_name: + param.op_name = K8sHelper.sanitize_k8s_name(param.op_name) + for param in op.outputs.values(): + param.name = K8sHelper.sanitize_k8s_name(param.name) + if param.op_name: + param.op_name = K8sHelper.sanitize_k8s_name(param.op_name) + if op.output is not None: + op.output.name = K8sHelper.sanitize_k8s_name(op.output.name) + op.output.op_name = K8sHelper.sanitize_k8s_name(op.output.op_name) + if op.file_outputs is not None: + sanitized_file_outputs = {} + for key in op.file_outputs.keys(): + sanitized_file_outputs[K8sHelper.sanitize_k8s_name(key)] = op.file_outputs[key] + op.file_outputs = sanitized_file_outputs + sanitized_ops[sanitized_name] = op + p.ops = sanitized_ops + workflow = self._create_pipeline_workflow(args_list_with_defaults, p) return workflow diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 01b709ad693..f7c56fd5b89 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -27,7 +27,8 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None """Create a new instance of ContainerOp. Args: - name: the name of the op. Has to be unique within a pipeline. + name: the name of the op. It does not have to be unique within a pipeline + because the pipeline will generates a unique new name in case of conflicts. image: the container image name, such as 'python:3.5-jessie' command: the command to run in the container. If None, uses default CMD in defined in container. @@ -43,6 +44,10 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None if not _pipeline.Pipeline.get_default_pipeline(): raise ValueError('Default pipeline not defined.') + valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$' + if not re.match(valid_name_regex, name): + raise ValueError('Only letters, numbers, spaces, "_", and "-" are allowed in name. Must begin with letter: %s' % (name)) + self.human_name = name self.name = _pipeline.Pipeline.get_default_pipeline().add_op(self, is_exit_handler) self.image = image @@ -60,7 +65,7 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None matches = [] for arg in (command or []) + (arguments or []): - match = re.findall(r'{{pipelineparam:op=([\w-]*);name=([\w-]+);value=(.*?)}}', str(arg)) + match = re.findall(r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+);value=(.*?)}}', str(arg)) matches += match self.argument_inputs = [_pipeline_param.PipelineParam(x[1], x[0], x[2]) diff --git a/sdk/python/kfp/dsl/_pipeline.py b/sdk/python/kfp/dsl/_pipeline.py index 76a0fbb37b1..6412a7f7c41 100644 --- a/sdk/python/kfp/dsl/_pipeline.py +++ b/sdk/python/kfp/dsl/_pipeline.py @@ -15,7 +15,6 @@ from . import _container_op from . import _ops_group -import re import sys @@ -38,9 +37,6 @@ def _pipeline(func): return _pipeline -def _make_kubernetes_name(name): - return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-') - class Pipeline(): """A pipeline contains a list of operators. @@ -106,22 +102,24 @@ def add_op(self, op: _container_op.ContainerOp, define_only: bool): Args: op: An operator of ContainerOp or its inherited type. + + Returns + op_name: a unique op name. """ - kubernetes_name = _make_kubernetes_name(op.human_name) - step_id = kubernetes_name + op_name = op.human_name #If there is an existing op with this name then generate a new name. - if step_id in self.ops: + if op_name in self.ops: for i in range(2, sys.maxsize**10): - step_id = kubernetes_name + '-' + str(i) - if step_id not in self.ops: + op_name = op_name + '-' + str(i) + if op_name not in self.ops: break - self.ops[step_id] = op + self.ops[op_name] = op if not define_only: self.groups[-1].ops.append(op) - return step_id + return op_name def push_ops_group(self, group: _ops_group.OpsGroup): """Push an OpsGroup into the stack. diff --git a/sdk/python/kfp/dsl/_pipeline_param.py b/sdk/python/kfp/dsl/_pipeline_param.py index 743941a6cd6..213f9d10d73 100644 --- a/sdk/python/kfp/dsl/_pipeline_param.py +++ b/sdk/python/kfp/dsl/_pipeline_param.py @@ -44,12 +44,9 @@ def __init__(self, name: str, op_name: str=None, value: str=None): and value are set. """ - valid_name_regex = r'^[A-Za-z][A-Za-z0-9-]*$' + valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$' if not re.match(valid_name_regex, name): - raise ValueError('Only letters, numbers and "-" allowed in name. Must begin with letter.') - - if op_name and not re.match(valid_name_regex, op_name): - raise ValueError('Only letters, numbers and "-" allowed in op_name. Must begin with letter.') + raise ValueError('Only letters, numbers, spaces, "_", and "-" are allowed in name. Must begin with letter: %s' % (name)) if op_name and value: raise ValueError('op_name and value cannot be both set.') diff --git a/sdk/python/tests/dsl/main.py b/sdk/python/tests/dsl/main.py index 88a459b9e0f..7e194cf386f 100644 --- a/sdk/python/tests/dsl/main.py +++ b/sdk/python/tests/dsl/main.py @@ -14,6 +14,7 @@ import unittest +import sys import pipeline_tests import pipeline_param_tests diff --git a/sdk/python/tests/dsl/pipeline_param_tests.py b/sdk/python/tests/dsl/pipeline_param_tests.py index 54d4c3a22ce..7cd2fd7d687 100644 --- a/sdk/python/tests/dsl/pipeline_param_tests.py +++ b/sdk/python/tests/dsl/pipeline_param_tests.py @@ -24,9 +24,6 @@ def test_invalid(self): with self.assertRaises(ValueError): p = PipelineParam(name='123_abc') - with self.assertRaises(ValueError): - p = PipelineParam(name='param1', op_name='a b') - def test_str_repr(self): """Test string representation."""