Skip to content

Commit

Permalink
Add notebook sample test: tfx sample (kubeflow#470)
Browse files Browse the repository at this point in the history
* add notebook sample tests for tfx

* parameterize component image tag

* parameterize base and target image tags

* install tensorflow package for the notebook tfx sample test

* bug fixes

* start debug mode

* fix bugs

* add namespace arg to check_notebook_results, copy test results to gcs, fix minor bugs
add CMLE model deletion

* install the correct KFP version in the notebook; parameterize deployer model name and version

* fix CMLE model name bug

* add notebook sample test in v2

* add gcp sa in notebook tfx sample and shutdown debug mode

* import kfp.gcp
  • Loading branch information
gaoning777 authored and neuromage committed Dec 13, 2018
1 parent a12b59f commit 2da57bb
Show file tree
Hide file tree
Showing 9 changed files with 342 additions and 57 deletions.
105 changes: 61 additions & 44 deletions samples/notebooks/KubeFlow Pipeline Using TFX OSS Components.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,37 @@
"## Setup"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"# Set your output and project. !!!Must Do before you can proceed!!!\n",
"EXPERIMENT_NAME = 'demo'\n",
"OUTPUT_DIR = 'Your-Gcs-Path' # Such as gs://bucket/objact/path\n",
"PROJECT_NAME = 'Your-Gcp-Project-Name'\n",
"BASE_IMAGE='gcr.io/%s/pusherbase:dev' % PROJECT_NAME\n",
"TARGET_IMAGE='gcr.io/%s/pusher:dev' % PROJECT_NAME\n",
"KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.3-rc.2/kfp.tar.gz'\n",
"TRAIN_DATA = 'gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv'\n",
"EVAL_DATA = 'gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv'\n",
"HIDDEN_LAYER_SIZE = '1500'\n",
"STEPS = 3000\n",
"DATAFLOW_TFDV_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:0.1.3-rc.2'#TODO-release: update the release tag for the next release\n",
"DATAFLOW_TFT_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:0.1.3-rc.2'#TODO-release: update the release tag for the next release\n",
"DATAFLOW_TFMA_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:0.1.3-rc.2'#TODO-release: update the release tag for the next release\n",
"DATAFLOW_TF_PREDICT_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:0.1.3-rc.2'#TODO-release: update the release tag for the next release\n",
"KUBEFLOW_TF_TRAINER_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:0.1.3-rc.2'#TODO-release: update the release tag for the next release\n",
"KUBEFLOW_DEPLOYER_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:0.1.3-rc.2'#TODO-release: update the release tag for the next release\n",
"DEV_DEPLOYER_MODEL = 'notebook-tfx-devtaxi.beta'\n",
"PROD_DEPLOYER_MODEL = 'notebook-tfx-prodtaxi.beta'"
]
},
{
"cell_type": "code",
"execution_count": 1,
Expand Down Expand Up @@ -81,26 +112,7 @@
],
"source": [
"# Install Pipeline SDK\n",
"!pip3 install https://storage.googleapis.com/ml-pipeline/release/0.1.3-rc.2/kfp.tar.gz --upgrade"
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [],
"source": [
"import kfp\n",
"from kfp import compiler\n",
"import kfp.dsl as dsl\n",
"import kfp.notebook\n",
"\n",
"\n",
"# Set your output and project. !!!Must Do before you can proceed!!!\n",
"OUTPUT_DIR = 'Your-Gcs-Path' # Such as gs://bucket/objact/path\n",
"PROJECT_NAME = 'Your-Gcp-Project-Name'\n",
"BASE_IMAGE='gcr.io/%s/pusherbase:dev' % PROJECT_NAME\n",
"TARGET_IMAGE='gcr.io/%s/pusher:dev' % PROJECT_NAME"
"!pip3 install $KFP_PACKAGE --upgrade"
]
},
{
Expand Down Expand Up @@ -133,8 +145,13 @@
"source": [
"# Note that this notebook should be running in JupyterHub in the same cluster as the pipeline system.\n",
"# Otherwise it will fail to talk to the pipeline system.\n",
"import kfp\n",
"from kfp import compiler\n",
"import kfp.dsl as dsl\n",
"import kfp.notebook\n",
"import kfp.gcp as gcp\n",
"client = kfp.Client()\n",
"exp = client.create_experiment(name='demo')"
"exp = client.create_experiment(name=EXPERIMENT_NAME)"
]
},
{
Expand Down Expand Up @@ -208,7 +225,7 @@
"def dataflow_tf_data_validation_op(inference_data: 'GcsUri', validation_data: 'GcsUri', column_names: 'GcsUri[text/json]', key_columns, project: 'GcpProject', mode, validation_output: 'GcsUri[Directory]', step_name='validation'):\n",
" return dsl.ContainerOp(\n",
" name = step_name,\n",
" image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:0.1.3-rc.2', #TODO-release: update the release tag for the next release\n",
" image = DATAFLOW_TFDV_IMAGE,\n",
" arguments = [\n",
" '--csv-data-for-inference', inference_data,\n",
" '--csv-data-to-validate', validation_data,\n",
Expand All @@ -222,12 +239,12 @@
" 'output': '/output.txt',\n",
" 'schema': '/output_schema.json',\n",
" }\n",
" )\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
"\n",
"def dataflow_tf_transform_op(train_data: 'GcsUri', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', preprocess_mode, preprocess_module: 'GcsUri[text/code/python]', transform_output: 'GcsUri[Directory]', step_name='preprocess'):\n",
" return dsl.ContainerOp(\n",
" name = step_name,\n",
" image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:0.1.3-rc.2', #TODO-release: update the release tag for the next release\n",
" image = DATAFLOW_TFT_IMAGE,\n",
" arguments = [\n",
" '--train', train_data,\n",
" '--eval', evaluation_data,\n",
Expand All @@ -238,13 +255,13 @@
" '--output', transform_output,\n",
" ],\n",
" file_outputs = {'transformed': '/output.txt'}\n",
" )\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
"\n",
"\n",
"def tf_train_op(transformed_data_dir, schema: 'GcsUri[text/json]', learning_rate: float, hidden_layer_size: int, steps: int, target: str, preprocess_module: 'GcsUri[text/code/python]', training_output: 'GcsUri[Directory]', step_name='training'):\n",
" return dsl.ContainerOp(\n",
" name = step_name,\n",
" image = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:0.1.3-rc.2', #TODO-release: update the release tag for the next release\n",
" image = KUBEFLOW_TF_TRAINER_IMAGE,\n",
" arguments = [\n",
" '--transformed-data-dir', transformed_data_dir,\n",
" '--schema', schema,\n",
Expand All @@ -256,12 +273,12 @@
" '--job-dir', training_output,\n",
" ],\n",
" file_outputs = {'train': '/output.txt'}\n",
" )\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
"\n",
"def dataflow_tf_model_analyze_op(model: 'TensorFlow model', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', analyze_mode, analyze_slice_column, analysis_output: 'GcsUri', step_name='analysis'):\n",
" return dsl.ContainerOp(\n",
" name = step_name,\n",
" image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:0.1.3-rc.2', #TODO-release: update the release tag for the next release\n",
" image = DATAFLOW_TFMA_IMAGE,\n",
" arguments = [\n",
" '--model', model,\n",
" '--eval', evaluation_data,\n",
Expand All @@ -272,13 +289,13 @@
" '--output', analysis_output,\n",
" ],\n",
" file_outputs = {'analysis': '/output.txt'}\n",
" )\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
"\n",
"\n",
"def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', target: str, model: 'TensorFlow model', predict_mode, project: 'GcpProject', prediction_output: 'GcsUri', step_name='prediction'):\n",
" return dsl.ContainerOp(\n",
" name = step_name,\n",
" image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:0.1.3-rc.2', #TODO-release: update the release tag for the next release\n",
" image = DATAFLOW_TF_PREDICT_IMAGE,\n",
" arguments = [\n",
" '--data', evaluation_data,\n",
" '--schema', schema,\n",
Expand All @@ -289,17 +306,17 @@
" '--output', prediction_output,\n",
" ],\n",
" file_outputs = {'prediction': '/output.txt'}\n",
" )\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
"\n",
"def kubeflow_deploy_op(model: 'TensorFlow model', tf_server_name, step_name='deploy'):\n",
" return dsl.ContainerOp(\n",
" name = step_name,\n",
" image = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:0.1.3-rc.2', #TODO-release: update the release tag for the next release\n",
" image = KUBEFLOW_DEPLOYER_IMAGE,\n",
" arguments = [\n",
" '--model-path', model,\n",
" '--server-name', tf_server_name\n",
" ]\n",
" )\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
"\n",
"\n",
"# The pipeline definition\n",
Expand All @@ -312,15 +329,15 @@
" project,\n",
" column_names=dsl.PipelineParam(name='column-names', value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/column-names.json'),\n",
" key_columns=dsl.PipelineParam(name='key-columns', value='trip_start_timestamp'),\n",
" train=dsl.PipelineParam(name='train', value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv'),\n",
" evaluation=dsl.PipelineParam(name='evaluation', value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv'),\n",
" train=dsl.PipelineParam(name='train', value=TRAIN_DATA),\n",
" evaluation=dsl.PipelineParam(name='evaluation', value=EVAL_DATA),\n",
" validation_mode=dsl.PipelineParam(name='validation-mode', value='local'),\n",
" preprocess_mode=dsl.PipelineParam(name='preprocess-mode', value='local'),\n",
" preprocess_module: dsl.PipelineParam=dsl.PipelineParam(name='preprocess-module', value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py'),\n",
" target=dsl.PipelineParam(name='target', value='tips'),\n",
" learning_rate=dsl.PipelineParam(name='learning-rate', value=0.1),\n",
" hidden_layer_size=dsl.PipelineParam(name='hidden-layer-size', value='1500'),\n",
" steps=dsl.PipelineParam(name='steps', value=3000),\n",
" hidden_layer_size=dsl.PipelineParam(name='hidden-layer-size', value=HIDDEN_LAYER_SIZE),\n",
" steps=dsl.PipelineParam(name='steps', value=STEPS),\n",
" predict_mode=dsl.PipelineParam(name='predict-mode', value='local'),\n",
" analyze_mode=dsl.PipelineParam(name='analyze-mode', value='local'),\n",
" analyze_slice_column=dsl.PipelineParam(name='analyze-slice-column', value='trip_start_hour')):\n",
Expand Down Expand Up @@ -480,7 +497,7 @@
"source": [
"# Test the function and make sure it works.\n",
"path = 'gs://ml-pipeline-playground/sampledata/taxi/train'\n",
"deploy_model('taxidev.beta', path, PROJECT_NAME, '1.9')"
"deploy_model(DEV_DEPLOYER_MODEL, path, PROJECT_NAME, '1.9')"
]
},
{
Expand Down Expand Up @@ -699,19 +716,19 @@
" key_columns=dsl.PipelineParam(name='key-columns', value='trip_start_timestamp'),\n",
" train=dsl.PipelineParam(\n",
" name='train',\n",
" value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv'),\n",
" value=TRAIN_DATA),\n",
" evaluation=dsl.PipelineParam(\n",
" name='evaluation',\n",
" value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv'),\n",
" value=EVAL_DATA),\n",
" validation_mode=dsl.PipelineParam(name='validation-mode', value='local'),\n",
" preprocess_mode=dsl.PipelineParam(name='preprocess-mode', value='local'),\n",
" preprocess_module: dsl.PipelineParam=dsl.PipelineParam(\n",
" name='preprocess-module',\n",
" value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py'),\n",
" target=dsl.PipelineParam(name='target', value='tips'),\n",
" learning_rate=dsl.PipelineParam(name='learning-rate', value=0.1),\n",
" hidden_layer_size=dsl.PipelineParam(name='hidden-layer-size', value='1500'),\n",
" steps=dsl.PipelineParam(name='steps', value=3000),\n",
" hidden_layer_size=dsl.PipelineParam(name='hidden-layer-size', value=HIDDEN_LAYER_SIZE),\n",
" steps=dsl.PipelineParam(name='steps', value=STEPS),\n",
" predict_mode=dsl.PipelineParam(name='predict-mode', value='local'),\n",
" analyze_mode=dsl.PipelineParam(name='analyze-mode', value='local'),\n",
" analyze_slice_column=dsl.PipelineParam(name='analyze-slice-column', value='trip_start_hour')):\n",
Expand All @@ -732,7 +749,7 @@
" prediction = dataflow_tf_predict_op(evaluation, schema, target, training.output, predict_mode, project, prediction_output)\n",
" \n",
" # The new deployer. Note that the DeployerOp interface is similar to the function \"deploy_model\".\n",
" deploy = DeployerOp(gcp_project=project, model_dot_version=model, runtime='1.9', model_path=training.output)"
" deploy = DeployerOp(gcp_project=project, model_dot_version=model, runtime='1.9', model_path=training.output).apply(gcp.use_gcp_secret('user-gcp-sa'))"
]
},
{
Expand Down Expand Up @@ -766,7 +783,7 @@
"run = client.run_pipeline(exp.id, 'my-tfx', 'my-tfx.tar.gz',\n",
" params={'output': OUTPUT_DIR,\n",
" 'project': PROJECT_NAME,\n",
" 'model': 'mytaxi.beta'})"
" 'model': PROD_DEPLOYER_MODEL})"
]
},
{
Expand Down
26 changes: 17 additions & 9 deletions samples/notebooks/Lightweight Python components - basics.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,28 @@
"* To build a component with multiple output values, use the typing.NamedTuple type hint syntax: ```NamedTuple('MyFunctionOutputs', [('output_name_1', type), ('output_name_2', float)])```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"EXPERIMENT_NAME = 'lightweight python components'\n",
"KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.3-rc.2/kfp.tar.gz'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Install the SDK\n",
"!pip3 install https://storage.googleapis.com/ml-pipeline/release/0.1.3-rc.2/kfp.tar.gz --upgrade\n"
"# Install the SDK\n",
"!pip3 install $KFP_PACKAGE --upgrade\n"
]
},
{
Expand Down Expand Up @@ -236,13 +250,7 @@
"#Get or create an experiment and submit a pipeline run\n",
"import kfp\n",
"client = kfp.Client()\n",
"list_experiments_response = client.list_experiments()\n",
"experiments = list_experiments_response.experiments\n",
"if not experiments:\n",
" #The user does not have any experiments available. Creating a new one\n",
" experiment = client.create_experiment(pipeline_func.__name__ + ' experiment')\n",
"else:\n",
" experiment = experiments[-1] #Using the last experiment\n",
"experiment = client.create_experiment(EXPERIMENT_NAME)\n",
"\n",
"#Submit a pipeline run\n",
"run_name = pipeline_func.__name__ + ' run'\n",
Expand Down
8 changes: 6 additions & 2 deletions sdk/python/kfp/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,20 @@ def run_pipeline(self, experiment_id, job_name, pipeline_package_path, params={}
IPython.display.display(IPython.display.HTML(html))
return response.run

def list_runs(self, page_token='', page_size=10, sort_by=''):
def list_runs(self, page_token='', page_size=10, sort_by='', resource_reference_key_type=None, resource_reference_key_id=None):
"""List runs.
Args:
page_token: token for starting of the page.
page_size: size of the page.
sort_by: one of 'field_name', 'field_name des'. For example, 'name des'.
resource_reference_key: resource filtering key
Returns:
A response object including a list of experiments and next page token.
"""
response = self._run_api.list_runs(page_token=page_token, page_size=page_size, sort_by=sort_by)
if resource_reference_key_type is not None and resource_reference_key_id is not None:
response = self._run_api.list_runs(page_token=page_token, page_size=page_size, sort_by=sort_by, resource_reference_key_type=resource_reference_key_type, resource_reference_key_id=resource_reference_key_id)
else:
response = self._run_api.list_runs(page_token=page_token, page_size=page_size, sort_by=sort_by)
return response

def get_run(self, run_id):
Expand Down
3 changes: 3 additions & 0 deletions test/sample-test/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ RUN pip3 install junit-xml
RUN pip3 install kubernetes
RUN pip3 install minio
RUN pip3 install setuptools==40.5.0
RUN pip3 install papermill==0.16.1
RUN pip3 install ipykernel==5.1.0
RUN pip3 install google-api-python-client==1.7.0

#Needs test/sample-test and the files needed to run sdk/python/build.sh
COPY . /python/src/github.com/kubeflow/pipelines
Expand Down
Loading

0 comments on commit 2da57bb

Please sign in to comment.