Skip to content

Commit

Permalink
Add support to generate custom task with custom task cr (kubeflow#593)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomcli authored May 22, 2021
1 parent 95557c6 commit 73d5445
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 6 deletions.
6 changes: 3 additions & 3 deletions scripts/check_diff.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# This script checks whether any of the files related to the backend docker
# build (files listed in files_to_check) have been modified from the
# origin/master branch. This is done by running a diff on each file between the
# This script checks whether any of the files related to the backend docker
# build (files listed in files_to_check) have been modified from the
# origin/master branch. This is done by running a diff on each file between the
# most recent commit on origin/master and HEAD.
#
# Execute from top-level directory i.e. kfp-tekton/
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ def mark_upstream_ios_of_output(template_output, marked_inputs,

# Remove pipeline task parameters unless they're used downstream
for task in pipeline_tasks:
if 'condition-' not in task['name']:
# Don't process condition and custom task parameters
if 'condition-' not in task['name'] and not task.get('taskRef', ''):
task['params'] = [
parameter_argument
for parameter_argument in task.get('params', [])
Expand Down
33 changes: 31 additions & 2 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import yaml
import os
import uuid
import ast

from typing import Callable, List, Text, Dict, Any
from os import environ as env
Expand Down Expand Up @@ -125,6 +126,7 @@ def __init__(self, **kwargs):
self.artifact_items = {}
self.loops_pipeline = {}
self.recursive_tasks = []
self.custom_task_crs = []
self.uuid = self._get_unique_id_code()
self._group_names = []
self.pipeline_labels = {}
Expand Down Expand Up @@ -736,7 +738,7 @@ def is_custom_task_output(operand) -> bool:
for index, item in enumerate(container_args):
if item.startswith('--'):
custom_task_args[item[2:]] = container_args[index + 1]
non_param_keys = ['name', 'apiVersion', 'kind']
non_param_keys = ['name', 'apiVersion', 'kind', 'taskSpec']
task_params = []
for key, value in custom_task_args.items():
if key not in non_param_keys:
Expand All @@ -752,6 +754,26 @@ def is_custom_task_output(operand) -> bool:
'kind': custom_task_args['kind']
}
}
if custom_task_args.get('taskSpec', ''):
try:
if custom_task_args['taskSpec']:
custom_task_cr = {
'apiVersion': custom_task_args['apiVersion'],
'kind': custom_task_args['kind'],
'metadata': {
'name': custom_task_args['name']
},
'spec': ast.literal_eval(custom_task_args['taskSpec'])
}
for existing_cr in self.custom_task_crs:
if existing_cr == custom_task_cr:
# Skip duplicated CR resource
custom_task_cr = {}
break
if custom_task_cr:
self.custom_task_crs.append(custom_task_cr)
except ValueError:
raise("Custom task spec %s is not a valid Python Dictionary" % custom_task_args['taskSpec'])
# Pop custom task artifacts since we have no control of how
# custom task controller is handling the container/task execution.
self.artifact_items.pop(template['metadata']['name'], None)
Expand Down Expand Up @@ -1232,7 +1254,10 @@ def _write_workflow(workflow: Dict[Text, Any],
'Please create a new issue at https://github.com/kubeflow/kfp-tekton/issues '
'attaching the pipeline DSL code and the pipeline YAML.')

yaml_text = dump_yaml(_handle_tekton_pipeline_variables(yaml.load(yaml_text, Loader=yaml.FullLoader)))
pipeline_run = yaml.load(yaml_text, Loader=yaml.FullLoader)
if pipeline_run.get("spec", {}) and pipeline_run["spec"].get("pipelineSpec", {}) and \
pipeline_run["spec"]["pipelineSpec"].get("tasks", []):
yaml_text = dump_yaml(_handle_tekton_pipeline_variables(pipeline_run))

if package_path is None:
return yaml_text
Expand Down Expand Up @@ -1282,6 +1307,10 @@ def _create_and_write_workflow(self,
package_path=os.path.splitext(package_path)[0] + "_pipelineloop_cr" + str(i + 1) + '.yaml')
else:
TektonCompiler._write_workflow(workflow=workflow, package_path=package_path) # Tekton change
# Separate custom task CR from the main workflow
for i in range(len(self.custom_task_crs)):
TektonCompiler._write_workflow(workflow=self.custom_task_crs[i],
package_path=os.path.splitext(package_path)[0] + "_customtask_cr" + str(i + 1) + '.yaml')
_validate_workflow(workflow)


Expand Down
7 changes: 7 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,13 @@ def test_tekton_custom_task_workflow(self):
from .testdata.tekton_custom_task import custom_task_pipeline
self._test_pipeline_workflow(custom_task_pipeline, 'tekton_custom_task.yaml')

def test_custom_task_spec_workflow(self):
"""
Test Tekton custom task with custom spec workflow.
"""
from .testdata.custom_task_spec import custom_task_pipeline
self._test_pipeline_workflow(custom_task_pipeline, 'custom_task_spec.yaml')

def test_long_param_name_workflow(self):
"""
Test long parameter name workflow.
Expand Down
51 changes: 51 additions & 0 deletions sdk/python/tests/compiler/testdata/custom_task_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl

MY_CUSTOM_TASK_IMAGE_NAME = "veryunique/image:latest"
from kfp_tekton.tekton import TEKTON_CUSTOM_TASK_IMAGES
TEKTON_CUSTOM_TASK_IMAGES = TEKTON_CUSTOM_TASK_IMAGES.append(MY_CUSTOM_TASK_IMAGE_NAME)


def getCustomOp():
CustomOp = dsl.ContainerOp(
name="any-name",
image=MY_CUSTOM_TASK_IMAGE_NAME,
command=["any", "command"],
arguments=["--apiVersion", "custom_task_api_version",
"--kind", "custom_task_kind",
"--name", "custom_task_name",
"--taskSpec", {"raw": "raw"},
"--other_custom_task_argument_keys", "args"],
file_outputs={"other_custom_task_argument_keys": '/anypath'}
)
# Annotation to tell the Argo controller that this CustomOp is for specific Tekton runtime only.
CustomOp.add_pod_annotation("valid_container", "false")
return CustomOp


@dsl.pipeline(
name='Tekton custom task on Kubeflow Pipeline',
description='Shows how to use Tekton custom task with custom spec on KFP'
)
def custom_task_pipeline():
test = getCustomOp()
test2 = getCustomOp().after(test)


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(custom_task_pipeline, __file__.replace('.py', '.yaml'))

51 changes: 51 additions & 0 deletions sdk/python/tests/compiler/testdata/custom_task_spec.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "Shows how to use Tekton
custom task with custom spec on KFP", "name": "Tekton custom task on Kubeflow
Pipeline"}'
sidecar.istio.io/inject: 'false'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{}'
tekton.dev/input_artifacts: '{}'
tekton.dev/output_artifacts: '{}'
name: tekton-custom-task-on-kubeflow-pipeline
spec:
pipelineSpec:
tasks:
- name: any-name
params:
- name: other_custom_task_argument_keys
value: args
taskRef:
apiVersion: custom_task_api_version
kind: custom_task_kind
name: custom_task_name
- name: any-name-2
params:
- name: other_custom_task_argument_keys
value: args
runAfter:
- any-name
taskRef:
apiVersion: custom_task_api_version
kind: custom_task_kind
name: custom_task_name
timeout: 0s
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: custom_task_api_version
kind: custom_task_kind
metadata:
name: custom_task_name
spec:
raw: raw
1 change: 1 addition & 0 deletions sdk/python/tests/e2e_test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ ignored_test_files:
- "long_param_name.yaml" # unit test only, not using real container image
- "recur_cond.yaml" # unit test only, not using real container image
- "cond_recur.yaml" # unit test for recursion under condition group, not a valid running example
- "custom_task_spec.yaml" # unit test for generating custom task with custom spec, not a valid running example

0 comments on commit 73d5445

Please sign in to comment.