Skip to content

Commit

Permalink
component build support for both python2 and python3 (kubeflow#730)
Browse files Browse the repository at this point in the history
* component build support for both python2 and python3

* add sample test

* remove the annotations for python2 component build

* add pathlib for python2 component build

* fix component build unit test

* fix bug in the dockerfile generator

* remove exist_ok in path.mkdir to make python2 compatible

* adjust unit test

* remove pathlib dependency for python2 component build

* remove the pathlib codes in python3 component build, but use python2 code instead; add a todo to create a new sample
  • Loading branch information
gaoning777 authored and k8s-ci-robot committed Feb 25, 2019

Verified

This commit was signed with the committer’s verified signature.
morozov Sergei Morozov
1 parent 7a5cee5 commit c4e7271
Showing 4 changed files with 268 additions and 38 deletions.
137 changes: 133 additions & 4 deletions samples/notebooks/KubeFlow Pipeline Using TFX OSS Components.ipynb
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
"PROJECT_NAME = 'Your-Gcp-Project-Name'\n",
"BASE_IMAGE='gcr.io/%s/pusherbase:dev' % PROJECT_NAME\n",
"TARGET_IMAGE='gcr.io/%s/pusher:dev' % PROJECT_NAME\n",
"TARGET_IMAGE_TWO='gcr.io/%s/pusher_two:dev' % PROJECT_NAME\n",
"KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.6/kfp.tar.gz'\n",
"TRAIN_DATA = 'gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv'\n",
"EVAL_DATA = 'gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv'\n",
@@ -52,7 +53,8 @@
"KUBEFLOW_DEPLOYER_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:5df2cdc1ed145320204e8bc73b59cdbd7b3da28f'\n",
"DEPLOYER_MODEL = 'notebook_tfx_taxi'\n",
"DEPLOYER_VERSION_DEV = 'dev'\n",
"DEPLOYER_VERSION_PROD = 'prod'"
"DEPLOYER_VERSION_PROD = 'prod'\n",
"DEPLOYER_VERSION_PROD_TWO = 'prodtwo'"
]
},
{
@@ -563,7 +565,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Submit a new job"
"### Submit a new job"
]
},
{
@@ -587,7 +589,133 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Clean up"
"## Customize a step in Python2\n",
"Let's reuse the deploy_model function defined above. However, this time we will use python2 instead of the default python3."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from kfp import compiler\n",
"\n",
"# The return value \"DeployerOp\" represents a step that can be used directly in a pipeline function\n",
"#TODO: demonstrate the python2 support in another sample.\n",
"DeployerOp = compiler.build_python_component(\n",
" component_func=deploy_model,\n",
" staging_gcs_path=OUTPUT_DIR,\n",
" dependency=[kfp.compiler.VersionedDependency(name='google-api-python-client', version='1.7.0')],\n",
" base_image='tensorflow/tensorflow:1.12.0',\n",
" target_image=TARGET_IMAGE_TWO,\n",
" python_version='python2')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Modify the pipeline with the new deployer"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# My New Pipeline. It's almost the same as the original one with the last step deployer replaced.\n",
"@dsl.pipeline(\n",
" name='TFX Taxi Cab Classification Pipeline Example',\n",
" description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.'\n",
")\n",
"def my_taxi_cab_classification(\n",
" output,\n",
" project,\n",
" model,\n",
" version,\n",
" column_names=dsl.PipelineParam(\n",
" name='column-names',\n",
" value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/column-names.json'),\n",
" key_columns=dsl.PipelineParam(name='key-columns', value='trip_start_timestamp'),\n",
" train=dsl.PipelineParam(\n",
" name='train',\n",
" value=TRAIN_DATA),\n",
" evaluation=dsl.PipelineParam(\n",
" name='evaluation',\n",
" value=EVAL_DATA),\n",
" validation_mode=dsl.PipelineParam(name='validation-mode', value='local'),\n",
" preprocess_mode=dsl.PipelineParam(name='preprocess-mode', value='local'),\n",
" preprocess_module: dsl.PipelineParam=dsl.PipelineParam(\n",
" name='preprocess-module',\n",
" value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py'),\n",
" target=dsl.PipelineParam(name='target', value='tips'),\n",
" learning_rate=dsl.PipelineParam(name='learning-rate', value=0.1),\n",
" hidden_layer_size=dsl.PipelineParam(name='hidden-layer-size', value=HIDDEN_LAYER_SIZE),\n",
" steps=dsl.PipelineParam(name='steps', value=STEPS),\n",
" predict_mode=dsl.PipelineParam(name='predict-mode', value='local'),\n",
" analyze_mode=dsl.PipelineParam(name='analyze-mode', value='local'),\n",
" analyze_slice_column=dsl.PipelineParam(name='analyze-slice-column', value='trip_start_hour')):\n",
" \n",
" \n",
" validation_output = '%s/{{workflow.name}}/validation' % output\n",
" transform_output = '%s/{{workflow.name}}/transformed' % output\n",
" training_output = '%s/{{workflow.name}}/train' % output\n",
" analysis_output = '%s/{{workflow.name}}/analysis' % output\n",
" prediction_output = '%s/{{workflow.name}}/predict' % output\n",
"\n",
" validation = dataflow_tf_data_validation_op(\n",
" train, evaluation, column_names, key_columns, project,\n",
" validation_mode, validation_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" preprocess = dataflow_tf_transform_op(\n",
" train, evaluation, validation.outputs['schema'], project, preprocess_mode,\n",
" preprocess_module, transform_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" training = tf_train_op(\n",
" preprocess.output, validation.outputs['schema'], learning_rate, hidden_layer_size,\n",
" steps, target, preprocess_module, training_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" analysis = dataflow_tf_model_analyze_op(\n",
" training.output, evaluation, validation.outputs['schema'], project,\n",
" analyze_mode, analyze_slice_column, analysis_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" prediction = dataflow_tf_predict_op(\n",
" evaluation, validation.outputs['schema'], target, training.output,\n",
" predict_mode, project, prediction_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" \n",
" # The new deployer. Note that the DeployerOp interface is similar to the function \"deploy_model\".\n",
" deploy = DeployerOp(\n",
" gcp_project=project, model_name=model, version_name=version, runtime='1.9',\n",
" model_path=training.output).apply(gcp.use_gcp_secret('user-gcp-sa'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit a new job"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"compiler.Compiler().compile(my_taxi_cab_classification, 'my-tfx-two.tar.gz')\n",
"\n",
"run = client.run_pipeline(exp.id, 'my-tfx-two', 'my-tfx-two.tar.gz',\n",
" params={'output': OUTPUT_DIR,\n",
" 'project': PROJECT_NAME,\n",
" 'model': DEPLOYER_MODEL,\n",
" 'version': DEPLOYER_VERSION_PROD_TWO})\n",
"\n",
"result = client.wait_for_run_completion(run.id, timeout=600)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Clean up"
]
},
{
@@ -612,6 +740,7 @@
"\n",
"\n",
"!gcloud ml-engine versions delete $DEPLOYER_VERSION_PROD --model $DEPLOYER_MODEL -q\n",
"!gcloud ml-engine versions delete $DEPLOYER_VERSION_PROD_TWO --model $DEPLOYER_MODEL -q\n",
"!gcloud ml-engine versions delete $DEPLOYER_VERSION_DEV --model $DEPLOYER_MODEL -q\n",
"!gcloud ml-engine models delete $DEPLOYER_MODEL -q"
]
@@ -633,7 +762,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
"version": "3.5.4rc1"
}
},
"nbformat": 4,
82 changes: 63 additions & 19 deletions sdk/python/kfp/compiler/_component_builder.py
Original file line number Diff line number Diff line change
@@ -145,16 +145,34 @@ def _generate_pip_requirement(self, dependency, requirement_filepath):
dependency_helper.add_python_package(version)
dependency_helper.generate_pip_requirements(requirement_filepath)

def _generate_dockerfile_with_py(self, target_file, base_image, python_filepath, has_requirement_file):
""" _generate_docker_file generates a simple dockerfile with the python path """
def _generate_dockerfile_with_py(self, target_file, base_image, python_filepath, has_requirement_file, python_version):
""" _generate_docker_file generates a simple dockerfile with the python path
args:
target_file (str): target file name for the dockerfile.
base_image (str): the base image name.
python_filepath (str): the path of the python file that is copied to the docker image.
has_requirement_file (bool): whether it has a requirement file or not.
python_version (str): choose python2 or python3
"""
if python_version not in ['python2', 'python3']:
raise ValueError('python_version has to be either python2 or python3')
with open(target_file, 'w') as f:
f.write('FROM ' + base_image + '\n')
f.write('RUN apt-get update -y && apt-get install --no-install-recommends -y -q python3 python3-pip python3-setuptools\n')
if python_version is 'python3':
f.write('RUN apt-get update -y && apt-get install --no-install-recommends -y -q python3 python3-pip python3-setuptools\n')
else:
f.write('RUN apt-get update -y && apt-get install --no-install-recommends -y -q python python-pip python-setuptools\n')
if has_requirement_file:
f.write('ADD ' + self._ARC_REQUIREMENT_FILE + ' /ml/\n')
f.write('RUN pip3 install -r /ml/' + self._ARC_REQUIREMENT_FILE + '\n')
if python_version is 'python3':
f.write('RUN pip3 install -r /ml/' + self._ARC_REQUIREMENT_FILE + '\n')
else:
f.write('RUN pip install -r /ml/' + self._ARC_REQUIREMENT_FILE + '\n')
f.write('ADD ' + python_filepath + " /ml/" + '\n')
f.write('ENTRYPOINT ["python3", "/ml/' + python_filepath + '"]')
if python_version is 'python3':
f.write('ENTRYPOINT ["python3", "/ml/' + python_filepath + '"]')
else:
f.write('ENTRYPOINT ["python", "/ml/' + python_filepath + '"]')

def _wrap_files_in_tarball(self, tarball_path, files={}):
""" _wrap_files_in_tarball creates a tarball for all the input files
@@ -165,16 +183,21 @@ def _wrap_files_in_tarball(self, tarball_path, files={}):
for key, value in files.items():
tarball.add(value, arcname=key)

def prepare_docker_tarball_with_py(self, arc_python_filename, python_filepath, base_image, local_tarball_path, dependency=None):
""" prepare_docker_tarball is the API to generate dockerfile and prepare the tarball with python scripts """
def prepare_docker_tarball_with_py(self, arc_python_filename, python_filepath, base_image, local_tarball_path, python_version, dependency=None):
""" prepare_docker_tarball is the API to generate dockerfile and prepare the tarball with python scripts
args:
python_version (str): choose python2 or python3
"""
if python_version not in ['python2', 'python3']:
raise ValueError('python_version has to be either python2 or python3')
with tempfile.TemporaryDirectory() as local_build_dir:
has_requirement_file = False
local_requirement_path = os.path.join(local_build_dir, self._ARC_REQUIREMENT_FILE)
if dependency is not None and len(dependency) != 0:
self._generate_pip_requirement(dependency, local_requirement_path)
has_requirement_file = True
local_dockerfile_path = os.path.join(local_build_dir, self._arc_dockerfile_name)
self._generate_dockerfile_with_py(local_dockerfile_path, base_image, arc_python_filename, has_requirement_file)
self._generate_dockerfile_with_py(local_dockerfile_path, base_image, arc_python_filename, has_requirement_file, python_version)
file_lists = {self._arc_dockerfile_name:local_dockerfile_path,
arc_python_filename:python_filepath}
if has_requirement_file:
@@ -272,7 +295,14 @@ def _generate_kaniko_spec(self, namespace, arc_dockerfile_name, gcs_path, target
return content

#TODO: currently it supports single output, future support for multiple return values
def _generate_entrypoint(self, component_func):
def _generate_entrypoint(self, component_func, python_version='python3'):
'''
args:
python_version (str): choose python2 or python3, default is python3
'''
if python_version not in ['python2', 'python3']:
raise ValueError('python_version has to be either python2 or python3')

fullargspec = inspect.getfullargspec(component_func)
annotations = fullargspec[6]
input_args = fullargspec[0]
@@ -312,9 +342,11 @@ def _generate_entrypoint(self, component_func):
codegen.writeline(call_component_func)

# Serialize output
codegen.writeline('from pathlib import Path')
codegen.writeline('Path(_output_file).parent.mkdir(parents=True, exist_ok=True)')
codegen.writeline('Path(_output_file).write_text(str(output))')
codegen.writeline('import os')
codegen.writeline('os.makedirs(os.path.dirname(_output_file))')
codegen.writeline('with open(_output_file, "w") as data:')
codegen.indent()
codegen.writeline('data.write(str(output))')
wrapper_code = codegen.end()

# CLI codes
@@ -337,6 +369,8 @@ def _generate_entrypoint(self, component_func):
if line.startswith('def '):
break
start_line_num += 1
if python_version == 'python2':
src_lines[start_line_num] = 'def ' + component_func.__name__ + '(' + ', '.join((inspect.getfullargspec(component_func).args)) + '):'
dedecorated_component_src = '\n'.join(src_lines[start_line_num:])

complete_component_code = dedecorated_component_src + '\n' + wrapper_code + '\n' + codegen.end()
@@ -358,13 +392,18 @@ def _build_image_from_tarball(self, local_tarball_path, namespace, timeout):
# Clean up
GCSHelper.remove_gcs_blob(self._gcs_path)

def build_image_from_func(self, component_func, namespace, base_image, timeout, dependency):
""" build_image builds an image for the given python function"""
def build_image_from_func(self, component_func, namespace, base_image, timeout, dependency, python_version='python3'):
""" build_image builds an image for the given python function
args:
python_version (str): choose python2 or python3, default is python3
"""
if python_version not in ['python2', 'python3']:
raise ValueError('python_version has to be either python2 or python3')
with tempfile.TemporaryDirectory() as local_build_dir:
# Generate entrypoint and serialization python codes
local_python_filepath = os.path.join(local_build_dir, self._arc_python_filepath)
logging.info('Generate entrypoint and serialization codes.')
complete_component_code = self._generate_entrypoint(component_func)
complete_component_code = self._generate_entrypoint(component_func, python_version)
with open(local_python_filepath, 'w') as f:
f.write(complete_component_code)

@@ -376,6 +415,7 @@ def build_image_from_func(self, component_func, namespace, base_image, timeout,
arc_python_filename=self._arc_python_filepath,
base_image=base_image,
local_tarball_path=local_tarball_path,
python_version=python_version,
dependency=dependency)
self._build_image_from_tarball(local_tarball_path, namespace, timeout)

@@ -445,7 +485,7 @@ def _generate_pythonop(component_func, target_image, target_component_file=None)

return _create_task_factory_from_component_spec(component_spec)

def build_python_component(component_func, target_image, base_image=None, dependency=[], staging_gcs_path=None, build_image=True, timeout=600, namespace='kubeflow', target_component_file=None):
def build_python_component(component_func, target_image, base_image=None, dependency=[], staging_gcs_path=None, build_image=True, timeout=600, namespace='kubeflow', target_component_file=None, python_version='python3'):
""" build_component automatically builds a container image for the component_func
based on the base_image and pushes to the target_image.
@@ -459,9 +499,9 @@ def build_python_component(component_func, target_image, base_image=None, depend
timeout (int): the timeout for the image build(in secs), default is 600 seconds
namespace (str): the namespace within which to run the kubernetes kaniko job, default is "kubeflow"
dependency (list): a list of VersionedDependency, which includes the package name and versions, default is empty
python_version (str): choose python2 or python3, default is python3
Raises:
ValueError: The function is not decorated with python_component decorator
ValueError: The function is not decorated with python_component decorator or the python_version is neither python2 nor python3
"""

_configure_logger(logging.getLogger())
@@ -471,6 +511,9 @@ def build_python_component(component_func, target_image, base_image=None, depend
if target_image is None:
raise ValueError('target_image must not be None')

if python_version not in ['python2', 'python3']:
raise ValueError('python_version has to be either python2 or python3')

if build_image:
if staging_gcs_path is None:
raise ValueError('staging_gcs_path must not be None')
@@ -486,7 +529,8 @@ def build_python_component(component_func, target_image, base_image=None, depend
target_image)
builder = ImageBuilder(gcs_base=staging_gcs_path, target_image=target_image)
builder.build_image_from_func(component_func, namespace=namespace,
base_image=base_image, timeout=timeout, dependency=dependency)
base_image=base_image, timeout=timeout,
python_version=python_version, dependency=dependency)
logging.info('Build component complete.')
return _generate_pythonop(component_func, target_image, target_component_file)

Loading

0 comments on commit c4e7271

Please sign in to comment.