From 3996e17ffbae1d3d7016430f21e638e04fd2e4dc Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Thu, 30 Jan 2020 16:57:04 -0800 Subject: [PATCH 1/6] Samples - Add and change preloaded samples Add the "Data passing in python components" tutorial. Combined the "BNasic samples" into the "DSL control structures" tutorial. --- .../src/apiserver/config/sample_config.json | 26 +-- .../DSL - Control structures.py | 106 +++++++++++++ ...ta passing in python components - Files.py | 149 ++++++++++++++++++ 3 files changed, 263 insertions(+), 18 deletions(-) create mode 100644 samples/tutorials/DSL - Control structures/DSL - Control structures.py create mode 100644 samples/tutorials/Data passing in python components/Data passing in python components - Files.py diff --git a/backend/src/apiserver/config/sample_config.json b/backend/src/apiserver/config/sample_config.json index c2b1ebd1207..dd394cfb678 100644 --- a/backend/src/apiserver/config/sample_config.json +++ b/backend/src/apiserver/config/sample_config.json @@ -1,32 +1,22 @@ [ { - "name": "[Sample] ML - XGBoost - Training with Confusion Matrix", + "name": "[Demo] ML - XGBoost - Training with Confusion Matrix", "description": "[GCP Permission requirements](https://github.com/kubeflow/pipelines/blob/master/samples/core/xgboost_training_cm#requirements). [source code](https://github.com/kubeflow/pipelines/blob/master/samples/core/xgboost_training_cm). A trainer that does end-to-end distributed training for XGBoost models.", "file": "/samples/core/xgboost_training_cm/xgboost_training_cm.py.yaml" }, { - "name": "[Sample] Unified DSL - Taxi Tip Prediction Model Trainer", + "name": "[Demo] Unified DSL - Taxi Tip Prediction Model Trainer", "description": "[GCP Permission requirements](https://github.com/kubeflow/pipelines/blob/master/samples/contrib/parameterized_tfx_oss#permission). [source code](https://console.cloud.google.com/mlengine/notebooks/deploy-notebook?q=download_url%3Dhttps%253A%252F%252Fraw.githubusercontent.com%252Fkubeflow%252Fpipelines%252F0.1.40%252Fsamples%252Fcore%252Fparameterized_tfx_oss%252Ftaxi_pipeline_notebook.ipynb). Example pipeline that does classification with model analysis based on a public tax cab BigQuery dataset.", "file": "/samples/core/parameterized_tfx_oss/parameterized_tfx_oss.py.yaml" }, { - "name": "[Sample] Basic - Sequential execution", - "description": "[source code](https://github.com/kubeflow/pipelines/blob/master/samples/core/sequential/sequential.py) A pipeline with two sequential steps.", - "file": "/samples/core/sequential/sequential.py.yaml" + "name": "[Tutorial] Data passing in python components", + "description": "[source code](https://github.com/kubeflow/pipelines/tree/master/samples/tutorials/Data%20passing%20in%20python%20components) Shows how to pass data between python components.", + "file": "/samples/tutorials/Data passing in python components/Data passing in python components - Files.py.yaml" }, { - "name": "[Sample] Basic - Parallel execution", - "description": "[source code](https://github.com/kubeflow/pipelines/blob/master/samples/core/parallel_join/parallel_join.py) A pipeline that downloads two messages in parallel and prints the concatenated result.", - "file": "/samples/core/parallel_join/parallel_join.py.yaml" - }, - { - "name": "[Sample] Basic - Conditional execution", - "description": "[source code](https://github.com/kubeflow/pipelines/blob/master/samples/core/condition/condition.py) A pipeline shows how to use dsl.Condition.", - "file": "/samples/core/condition/condition.py.yaml" - }, - { - "name": "[Sample] Basic - Exit Handler", - "description": "[source code](https://github.com/kubeflow/pipelines/blob/master/samples/core/exit_handler/exit_handler.py) A pipeline that downloads a message and prints it out. Exit Handler will run at the end.", - "file": "/samples/core/exit_handler/exit_handler.py.yaml" + "name": "[Tutorial] DSL - Control structures", + "description": "[source code](https://github.com/kubeflow/pipelines/tree/master/samples/tutorials/DSL%20-%20Control%20structures) Shows how to use conditional execution and exit handlers.", + "file": "/samples/tutorials/DSL - Control structures/DSL - Control structures.py.yaml" } ] diff --git a/samples/tutorials/DSL - Control structures/DSL - Control structures.py b/samples/tutorials/DSL - Control structures/DSL - Control structures.py new file mode 100644 index 00000000000..58e54465512 --- /dev/null +++ b/samples/tutorials/DSL - Control structures/DSL - Control structures.py @@ -0,0 +1,106 @@ +# %% [markdown] +# # DSL control structures tutorial +# Shows how to use conditional execution and exit handlers. + +# %% +from typing import NamedTuple + +import kfp +from kfp import dsl +from kfp.components import func_to_container_op, InputPath, OutputPath + +# %% [markdown] +# ## Conditional execution +# You can use the `with dsl.Condition(task1.outputs["output_name"] = "value"):` context to execute parts of the pipeline conditionally + +# %% + +@func_to_container_op +def get_random_int_op(min: int, max: int) -> int: + """Generate a random number between min and max (inclusive).""" + import random + result = random.randint(min, max) + print(result) + return result + + +@func_to_container_op +def flip_coin_op() -> str: + """Flip a coin and output heads or tails randomly.""" + import random + result = random.choice(['heads', 'tails']) + print(result) + return result + + +@func_to_container_op +def print_op(message: str): + """Print a message.""" + print(message) + + +@dsl.pipeline( + name='Conditional execution pipeline', + description='Shows how to use dsl.Condition().' +) +def flipcoin_pipeline(): + flip = flip_coin_op() + with dsl.Condition(flip.output == 'heads'): + random_num_head = get_random_int_op(0, 9) + with dsl.Condition(random_num_head.output > 5): + print_op('heads and %s > 5!' % random_num_head.output) + with dsl.Condition(random_num_head.output <= 5): + print_op('heads and %s <= 5!' % random_num_head.output) + + with dsl.Condition(flip.output == 'tails'): + random_num_tail = get_random_int_op(10, 19) + with dsl.Condition(random_num_tail.output > 15): + print_op('tails and %s > 15!' % random_num_tail.output) + with dsl.Condition(random_num_tail.output <= 15): + print_op('tails and %s <= 15!' % random_num_tail.output) + + +# Submit the pipeline for execution: +#kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(flipcoin_pipeline, arguments={}) + +# %% [markdown] +# ## Exit handlers +# You can use `with dsl.ExitHandler(exit_task):` context to execute a task when the rest of the pipeline finishes (succeeds or fails) + + +@func_to_container_op +def fail_op(): + """Fails.""" + import sys + sys.exit(1) + + +@dsl.pipeline( + name='Conditional execution pipeline with exit handler', + description='Shows how to use dsl.Condition() and dsl.ExitHandler().' +) +def flipcoin_exit_pipeline(): + exit_task = print_op('Exit handler has worked!') + with dsl.ExitHandler(exit_task): + flip = flip_coin_op() + with dsl.Condition(flip.output == 'heads'): + random_num_head = get_random_int_op(0, 9) + with dsl.Condition(random_num_head.output > 5): + print_op('heads and %s > 5!' % random_num_head.output) + with dsl.Condition(random_num_head.output <= 5): + print_op('heads and %s <= 5!' % random_num_head.output) + + with dsl.Condition(flip.output == 'tails'): + random_num_tail = get_random_int_op(10, 19) + with dsl.Condition(random_num_tail.output > 15): + print_op('tails and %s > 15!' % random_num_tail.output) + with dsl.Condition(random_num_tail.output <= 15): + print_op('tails and %s <= 15!' % random_num_tail.output) + + with dsl.Condition(flip.output == 'tails'): + fail_op() + + +if __name__ == '__main__': + # Compiling the pipeline + kfp.compiler.Compiler().compile(flipcoin_exit_pipeline, __file__ + '.yaml') diff --git a/samples/tutorials/Data passing in python components/Data passing in python components - Files.py b/samples/tutorials/Data passing in python components/Data passing in python components - Files.py new file mode 100644 index 00000000000..f524e023c5a --- /dev/null +++ b/samples/tutorials/Data passing in python components/Data passing in python components - Files.py @@ -0,0 +1,149 @@ +# %% [markdown] +# # Data passing tutorial +# Data passing is the most important aspect of Pipelines. +# +# In Kubeflow Pipelines, the pipeline authors compose pipelines by creating component instances (tasks) and connecting them together. +# +# Component have inputs and outputs. They can consume and produce arbitrary data. +# +# Pipeline authors establish connections between component tasks by connecting their data inputs and outputs - by passing the output of one task as an argument to another task's input. +# +# The system takes care of storing the data produced by components and later passing that data to other components for consumption as instructed by the pipeline. +# +# This tutorial shows how to create python components that produce, consume and transform data. +# It shows how to create data passing pipelines by instantiating components and connecting them together. + +# %% +from typing import NamedTuple + +import kfp +from kfp.components import func_to_container_op, InputPath, OutputPath + +# %% [markdown] +# ## Small data +# +# Small data is the data that you'll be comfortable passing as program's command-line argument. Small data size should not exceed few kilobytes. +# +# Some examples of typical types of small data are: number, URL, small string (e.g. column name). +# +# Small lists, dictionaries and JSON structures are fine, but keep an eye on the size and consider switching to file-based data passing methods taht are more suitable for bigger data (more than several kilobytes) or binary data. +# +# All small data outputs will be at some point serialized to strings and all small data input values will be at some point deserialized from strings (passed as command-line argumants). There are built-in serializers and deserializers for several common types (e.g. `str`, `int`, `float`, `bool`, `list`, `dict`). All other types of data need to be serialized manually before returning the data. Make sure to properly specify type annotations, otherwize there would be no automatic deserialization and the component function will receive strings instead of deserialized objects. + +# %% [markdown] +# ## Bigger data (files) +# +# Bigger data should be read from files and written to files. +# +# The paths for the input and output files are chosen by the system and are passed into the function (as strings). +# +# Use the `InputPath` parameter annotation to tell the system that the function wants to consume the corresponding input data as a file. The system will download the data, write it to a local file and then pass the **path** of that file to the function. +# +# Use the `OutputPath` parameter annotation to tell the system that the function wants to produce the corresponding output data as a file. The system will prepare and pass the **path** of a file where the function should write the output data. After the function exits, the system will upload the data to the storage system so that it can be passed to downstream components. +# +# You can specify the type of the consumed/produced data by specifying the type argument to `InputPath` and `OutputPath`. The type can be a python type or an arbitrary type name string. `OutputPath('TFModel')` means that the function states that the data it has written to a file has type 'TFModel'. `InputPath('TFModel')` means that the function states that it expect the data it reads from a file to have type 'TFModel'. When the pipeline author connects inputs to outputs the system checks whether the types match. +# +# Note on input/output names: When the function is converted to component, the input and output names generally follow the parameter names, but the "\_path" and "\_file" suffixes are stripped from file/path inputs and outputs. E.g. the `number_file_path: InputPath(int)` parameter becomes the `number: int` input. This makes the argument passing look more natural: `number=42` instead of `number_file_path=42`. +# %% [markdown] +# +# ### Writing and reading bigger data + +# %% +# Writing bigger data +@func_to_container_op +def repeat_line(line: str, output_text_path: OutputPath(str), count: int = 10): + '''Repeat the line specified number of times''' + with open(output_text_path, 'w') as writer: + for i in range(count): + writer.write(line + '\n') + + +# Reading bigger data +@func_to_container_op +def print_text(text_path: InputPath()): # The "text" input is untyped so that any data can be printed + '''Print text''' + with open(text_path, 'r') as reader: + for line in reader: + print(line, end = '') + +def print_repeating_lines_pipeline(): + repeat_lines_task = repeat_line(line='Hello', count=5000) + print_text(repeat_lines_task.output) # Don't forget .output ! + +# Submit the pipeline for execution: +#kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(print_repeating_lines_pipeline, arguments={}) + +# %% [markdown] +# ### Processing bigger data + +# %% +@func_to_container_op +def split_text_lines(source_path: InputPath(str), odd_lines_path: OutputPath(str), even_lines_path: OutputPath(str)): + with open(source_path, 'r') as reader: + with open(odd_lines_path, 'w') as odd_writer: + with open(even_lines_path, 'w') as even_writer: + while True: + line = reader.readline() + if line == "": + break + odd_writer.write(line) + line = reader.readline() + if line == "": + break + even_writer.write(line) + +def text_splitting_pipeline(): + text = '\n'.join(['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten']) + split_text_task = split_text_lines(text) + print_text(split_text_task.outputs['odd_lines']) + print_text(split_text_task.outputs['even_lines']) + +# Submit the pipeline for execution: +#kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(text_splitting_pipeline, arguments={}) + + +# %% [markdown] +# ### Example: Pipeline that generates then sums many numbers + +# %% +# Writing many numbers +@func_to_container_op +def write_numbers(numbers_path: OutputPath(str), start: int = 0, count: int = 10): + with open(numbers_path, 'w') as writer: + for i in range(start, count): + writer.write(str(i) + '\n') + + +# Reading and summing many numbers +@func_to_container_op +def sum_numbers(numbers_path: InputPath(str)) -> int: + sum = 0 + with open(numbers_path, 'r') as reader: + for line in reader: + sum = sum + int(line) + return sum + + + +# Pipeline to sum 100000 numbers +def sum_pipeline(count: int = 100000): + numbers_task = write_numbers(count=count) + print_text(numbers_task.output) + + sum_task = sum_numbers(numbers_task.outputs['numbers']) + print_text(sum_task.output) + + +# Submit the pipeline for execution: +#kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(sum_pipeline, arguments={}) + +# Combining all pipelines together in a single pipeline +def file_passing_pipelines(): + print_repeating_lines_pipeline() + text_splitting_pipeline() + sum_pipeline() + + +if __name__ == '__main__': + # Compiling the pipeline + kfp.compiler.Compiler().compile(file_passing_pipelines, __file__ + '.yaml') From 4869f07c4271deb8be211317981a355f5c3d2e18 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Thu, 30 Jan 2020 18:51:16 -0800 Subject: [PATCH 2/6] Updated teh Dockerfile to build the tutorials --- backend/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/Dockerfile b/backend/Dockerfile index 754e7b9d0ac..98a925c99bb 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -45,7 +45,7 @@ COPY ./samples . #I think it's better to just use a shell loop though. #RUN for pipeline in $(find . -maxdepth 2 -name '*.py' -type f); do dsl-compile --py "$pipeline" --output "$pipeline.tar.gz"; done #The "for" loop breaks on all whitespace, so we either need to override IFS or use the "read" command instead. -RUN set -e; find core -maxdepth 2 -name '*.py' -type f | while read pipeline; do python3 $pipeline; done +RUN set -e; find core tutorials -maxdepth 2 -name '*.py' -type f | while read pipeline; do python3 $pipeline; done FROM debian:stretch @@ -66,7 +66,7 @@ COPY --from=compiler /samples/ /samples/ RUN apt-get update && apt-get install -y ca-certificates # Pin sample doc links to the commit that built the backend image -RUN sed "s#/blob/master/#/blob/${COMMIT_SHA}/#g" -i /config/sample_config.json +RUN sed -E "s#/(blob|tree)/master/#/\1/${COMMIT_SHA}/#g" -i /config/sample_config.json # Expose apiserver port EXPOSE 8888 From baff1bb11abb6239c983b5a13385038438b5eb89 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Thu, 30 Jan 2020 19:52:54 -0800 Subject: [PATCH 3/6] Defensive shell programming --- backend/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/Dockerfile b/backend/Dockerfile index 98a925c99bb..9ae994f88fc 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -45,7 +45,7 @@ COPY ./samples . #I think it's better to just use a shell loop though. #RUN for pipeline in $(find . -maxdepth 2 -name '*.py' -type f); do dsl-compile --py "$pipeline" --output "$pipeline.tar.gz"; done #The "for" loop breaks on all whitespace, so we either need to override IFS or use the "read" command instead. -RUN set -e; find core tutorials -maxdepth 2 -name '*.py' -type f | while read pipeline; do python3 $pipeline; done +RUN set -e; find core tutorials -maxdepth 2 -name '*.py' -type f | while read pipeline; do python3 "$pipeline"; done FROM debian:stretch From 00d444eaea62ee9f772ec46bea912e20434fcafe Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Fri, 31 Jan 2020 12:35:13 -0800 Subject: [PATCH 4/6] Stop shadowing min and max --- .../DSL - Control structures/DSL - Control structures.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/samples/tutorials/DSL - Control structures/DSL - Control structures.py b/samples/tutorials/DSL - Control structures/DSL - Control structures.py index 58e54465512..6e060f34ed7 100644 --- a/samples/tutorials/DSL - Control structures/DSL - Control structures.py +++ b/samples/tutorials/DSL - Control structures/DSL - Control structures.py @@ -16,10 +16,10 @@ # %% @func_to_container_op -def get_random_int_op(min: int, max: int) -> int: - """Generate a random number between min and max (inclusive).""" +def get_random_int_op(minimum: int, maximum: int) -> int: + """Generate a random number between minimum and maximum (inclusive).""" import random - result = random.randint(min, max) + result = random.randint(minimum, maximum) print(result) return result From 04102d61309af39f2b953a285db20a7f7f922142 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Fri, 31 Jan 2020 12:36:09 -0800 Subject: [PATCH 5/6] Added missing magic --- .../DSL - Control structures/DSL - Control structures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/tutorials/DSL - Control structures/DSL - Control structures.py b/samples/tutorials/DSL - Control structures/DSL - Control structures.py index 6e060f34ed7..3842c568e50 100644 --- a/samples/tutorials/DSL - Control structures/DSL - Control structures.py +++ b/samples/tutorials/DSL - Control structures/DSL - Control structures.py @@ -67,7 +67,7 @@ def flipcoin_pipeline(): # ## Exit handlers # You can use `with dsl.ExitHandler(exit_task):` context to execute a task when the rest of the pipeline finishes (succeeds or fails) - +# %% @func_to_container_op def fail_op(): """Fails.""" From d2d8a12adf5e1d4e15e46b9187370f833e9f7a16 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Fri, 31 Jan 2020 12:47:36 -0800 Subject: [PATCH 6/6] Added copyrights --- .../DSL - Control structures.py | 16 ++++++++++++++++ .../Data passing in python components - Files.py | 15 +++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/samples/tutorials/DSL - Control structures/DSL - Control structures.py b/samples/tutorials/DSL - Control structures/DSL - Control structures.py index 3842c568e50..0c69a27234f 100644 --- a/samples/tutorials/DSL - Control structures/DSL - Control structures.py +++ b/samples/tutorials/DSL - Control structures/DSL - Control structures.py @@ -1,3 +1,19 @@ +#!/usr/bin/env python3 +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + # %% [markdown] # # DSL control structures tutorial # Shows how to use conditional execution and exit handlers. diff --git a/samples/tutorials/Data passing in python components/Data passing in python components - Files.py b/samples/tutorials/Data passing in python components/Data passing in python components - Files.py index f524e023c5a..c023c827558 100644 --- a/samples/tutorials/Data passing in python components/Data passing in python components - Files.py +++ b/samples/tutorials/Data passing in python components/Data passing in python components - Files.py @@ -1,3 +1,18 @@ +#!/usr/bin/env python3 +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + # %% [markdown] # # Data passing tutorial # Data passing is the most important aspect of Pipelines.