Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add notebook sample test: tfx sample #470

Merged
merged 18 commits into from
Dec 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 61 additions & 44 deletions samples/notebooks/KubeFlow Pipeline Using TFX OSS Components.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,37 @@
"## Setup"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"# Set your output and project. !!!Must Do before you can proceed!!!\n",
"EXPERIMENT_NAME = 'demo'\n",
"OUTPUT_DIR = 'Your-Gcs-Path' # Such as gs://bucket/objact/path\n",
"PROJECT_NAME = 'Your-Gcp-Project-Name'\n",
"BASE_IMAGE='gcr.io/%s/pusherbase:dev' % PROJECT_NAME\n",
"TARGET_IMAGE='gcr.io/%s/pusher:dev' % PROJECT_NAME\n",
"KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.3-rc.2/kfp.tar.gz'\n",
"TRAIN_DATA = 'gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv'\n",
"EVAL_DATA = 'gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv'\n",
"HIDDEN_LAYER_SIZE = '1500'\n",
"STEPS = 3000\n",
"DATAFLOW_TFDV_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:0.1.3-rc.2'#TODO-release: update the release tag for the next release\n",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking the images out of the components does not look great.
Can we do the following: The notebook compiles the pipeline as normal, but then the image buckets and tags are replaced. Similarly to how the sample tests are performed.

"DATAFLOW_TFT_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:0.1.3-rc.2'#TODO-release: update the release tag for the next release\n",
"DATAFLOW_TFMA_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:0.1.3-rc.2'#TODO-release: update the release tag for the next release\n",
"DATAFLOW_TF_PREDICT_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:0.1.3-rc.2'#TODO-release: update the release tag for the next release\n",
"KUBEFLOW_TF_TRAINER_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:0.1.3-rc.2'#TODO-release: update the release tag for the next release\n",
"KUBEFLOW_DEPLOYER_IMAGE = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:0.1.3-rc.2'#TODO-release: update the release tag for the next release\n",
"DEV_DEPLOYER_MODEL = 'notebook-tfx-devtaxi.beta'\n",
"PROD_DEPLOYER_MODEL = 'notebook-tfx-prodtaxi.beta'"
]
},
{
"cell_type": "code",
"execution_count": 1,
Expand Down Expand Up @@ -81,26 +112,7 @@
],
"source": [
"# Install Pipeline SDK\n",
"!pip3 install https://storage.googleapis.com/ml-pipeline/release/0.1.3-rc.2/kfp.tar.gz --upgrade"
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [],
"source": [
"import kfp\n",
"from kfp import compiler\n",
"import kfp.dsl as dsl\n",
"import kfp.notebook\n",
"\n",
"\n",
"# Set your output and project. !!!Must Do before you can proceed!!!\n",
"OUTPUT_DIR = 'Your-Gcs-Path' # Such as gs://bucket/objact/path\n",
"PROJECT_NAME = 'Your-Gcp-Project-Name'\n",
"BASE_IMAGE='gcr.io/%s/pusherbase:dev' % PROJECT_NAME\n",
"TARGET_IMAGE='gcr.io/%s/pusher:dev' % PROJECT_NAME"
"!pip3 install $KFP_PACKAGE --upgrade"
]
},
{
Expand Down Expand Up @@ -133,8 +145,13 @@
"source": [
"# Note that this notebook should be running in JupyterHub in the same cluster as the pipeline system.\n",
"# Otherwise it will fail to talk to the pipeline system.\n",
"import kfp\n",
"from kfp import compiler\n",
"import kfp.dsl as dsl\n",
"import kfp.notebook\n",
"import kfp.gcp as gcp\n",
"client = kfp.Client()\n",
"exp = client.create_experiment(name='demo')"
"exp = client.create_experiment(name=EXPERIMENT_NAME)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should reuse the experiment if it already exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But for that, we can have another PR to do that. The scope for this PR is to add the sample tests, and it requires parameterizing the pipeline notebooks.

]
},
{
Expand Down Expand Up @@ -208,7 +225,7 @@
"def dataflow_tf_data_validation_op(inference_data: 'GcsUri', validation_data: 'GcsUri', column_names: 'GcsUri[text/json]', key_columns, project: 'GcpProject', mode, validation_output: 'GcsUri[Directory]', step_name='validation'):\n",
" return dsl.ContainerOp(\n",
" name = step_name,\n",
" image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:0.1.3-rc.2', #TODO-release: update the release tag for the next release\n",
" image = DATAFLOW_TFDV_IMAGE,\n",
" arguments = [\n",
" '--csv-data-for-inference', inference_data,\n",
" '--csv-data-to-validate', validation_data,\n",
Expand All @@ -222,12 +239,12 @@
" 'output': '/output.txt',\n",
" 'schema': '/output_schema.json',\n",
" }\n",
" )\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
"\n",
"def dataflow_tf_transform_op(train_data: 'GcsUri', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', preprocess_mode, preprocess_module: 'GcsUri[text/code/python]', transform_output: 'GcsUri[Directory]', step_name='preprocess'):\n",
" return dsl.ContainerOp(\n",
" name = step_name,\n",
" image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:0.1.3-rc.2', #TODO-release: update the release tag for the next release\n",
" image = DATAFLOW_TFT_IMAGE,\n",
" arguments = [\n",
" '--train', train_data,\n",
" '--eval', evaluation_data,\n",
Expand All @@ -238,13 +255,13 @@
" '--output', transform_output,\n",
" ],\n",
" file_outputs = {'transformed': '/output.txt'}\n",
" )\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
"\n",
"\n",
"def tf_train_op(transformed_data_dir, schema: 'GcsUri[text/json]', learning_rate: float, hidden_layer_size: int, steps: int, target: str, preprocess_module: 'GcsUri[text/code/python]', training_output: 'GcsUri[Directory]', step_name='training'):\n",
" return dsl.ContainerOp(\n",
" name = step_name,\n",
" image = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:0.1.3-rc.2', #TODO-release: update the release tag for the next release\n",
" image = KUBEFLOW_TF_TRAINER_IMAGE,\n",
" arguments = [\n",
" '--transformed-data-dir', transformed_data_dir,\n",
" '--schema', schema,\n",
Expand All @@ -256,12 +273,12 @@
" '--job-dir', training_output,\n",
" ],\n",
" file_outputs = {'train': '/output.txt'}\n",
" )\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
"\n",
"def dataflow_tf_model_analyze_op(model: 'TensorFlow model', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', analyze_mode, analyze_slice_column, analysis_output: 'GcsUri', step_name='analysis'):\n",
" return dsl.ContainerOp(\n",
" name = step_name,\n",
" image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:0.1.3-rc.2', #TODO-release: update the release tag for the next release\n",
" image = DATAFLOW_TFMA_IMAGE,\n",
" arguments = [\n",
" '--model', model,\n",
" '--eval', evaluation_data,\n",
Expand All @@ -272,13 +289,13 @@
" '--output', analysis_output,\n",
" ],\n",
" file_outputs = {'analysis': '/output.txt'}\n",
" )\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
"\n",
"\n",
"def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', target: str, model: 'TensorFlow model', predict_mode, project: 'GcpProject', prediction_output: 'GcsUri', step_name='prediction'):\n",
" return dsl.ContainerOp(\n",
" name = step_name,\n",
" image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:0.1.3-rc.2', #TODO-release: update the release tag for the next release\n",
" image = DATAFLOW_TF_PREDICT_IMAGE,\n",
" arguments = [\n",
" '--data', evaluation_data,\n",
" '--schema', schema,\n",
Expand All @@ -289,17 +306,17 @@
" '--output', prediction_output,\n",
" ],\n",
" file_outputs = {'prediction': '/output.txt'}\n",
" )\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
"\n",
"def kubeflow_deploy_op(model: 'TensorFlow model', tf_server_name, step_name='deploy'):\n",
" return dsl.ContainerOp(\n",
" name = step_name,\n",
" image = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:0.1.3-rc.2', #TODO-release: update the release tag for the next release\n",
" image = KUBEFLOW_DEPLOYER_IMAGE,\n",
" arguments = [\n",
" '--model-path', model,\n",
" '--server-name', tf_server_name\n",
" ]\n",
" )\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
"\n",
"\n",
"# The pipeline definition\n",
Expand All @@ -312,15 +329,15 @@
" project,\n",
" column_names=dsl.PipelineParam(name='column-names', value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/column-names.json'),\n",
" key_columns=dsl.PipelineParam(name='key-columns', value='trip_start_timestamp'),\n",
" train=dsl.PipelineParam(name='train', value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv'),\n",
" evaluation=dsl.PipelineParam(name='evaluation', value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv'),\n",
" train=dsl.PipelineParam(name='train', value=TRAIN_DATA),\n",
" evaluation=dsl.PipelineParam(name='evaluation', value=EVAL_DATA),\n",
" validation_mode=dsl.PipelineParam(name='validation-mode', value='local'),\n",
" preprocess_mode=dsl.PipelineParam(name='preprocess-mode', value='local'),\n",
" preprocess_module: dsl.PipelineParam=dsl.PipelineParam(name='preprocess-module', value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py'),\n",
" target=dsl.PipelineParam(name='target', value='tips'),\n",
" learning_rate=dsl.PipelineParam(name='learning-rate', value=0.1),\n",
" hidden_layer_size=dsl.PipelineParam(name='hidden-layer-size', value='1500'),\n",
" steps=dsl.PipelineParam(name='steps', value=3000),\n",
" hidden_layer_size=dsl.PipelineParam(name='hidden-layer-size', value=HIDDEN_LAYER_SIZE),\n",
" steps=dsl.PipelineParam(name='steps', value=STEPS),\n",
" predict_mode=dsl.PipelineParam(name='predict-mode', value='local'),\n",
" analyze_mode=dsl.PipelineParam(name='analyze-mode', value='local'),\n",
" analyze_slice_column=dsl.PipelineParam(name='analyze-slice-column', value='trip_start_hour')):\n",
Expand Down Expand Up @@ -480,7 +497,7 @@
"source": [
"# Test the function and make sure it works.\n",
"path = 'gs://ml-pipeline-playground/sampledata/taxi/train'\n",
"deploy_model('taxidev.beta', path, PROJECT_NAME, '1.9')"
"deploy_model(DEV_DEPLOYER_MODEL, path, PROJECT_NAME, '1.9')"
]
},
{
Expand Down Expand Up @@ -699,19 +716,19 @@
" key_columns=dsl.PipelineParam(name='key-columns', value='trip_start_timestamp'),\n",
" train=dsl.PipelineParam(\n",
" name='train',\n",
" value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv'),\n",
" value=TRAIN_DATA),\n",
" evaluation=dsl.PipelineParam(\n",
" name='evaluation',\n",
" value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv'),\n",
" value=EVAL_DATA),\n",
" validation_mode=dsl.PipelineParam(name='validation-mode', value='local'),\n",
" preprocess_mode=dsl.PipelineParam(name='preprocess-mode', value='local'),\n",
" preprocess_module: dsl.PipelineParam=dsl.PipelineParam(\n",
" name='preprocess-module',\n",
" value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py'),\n",
" target=dsl.PipelineParam(name='target', value='tips'),\n",
" learning_rate=dsl.PipelineParam(name='learning-rate', value=0.1),\n",
" hidden_layer_size=dsl.PipelineParam(name='hidden-layer-size', value='1500'),\n",
" steps=dsl.PipelineParam(name='steps', value=3000),\n",
" hidden_layer_size=dsl.PipelineParam(name='hidden-layer-size', value=HIDDEN_LAYER_SIZE),\n",
" steps=dsl.PipelineParam(name='steps', value=STEPS),\n",
" predict_mode=dsl.PipelineParam(name='predict-mode', value='local'),\n",
" analyze_mode=dsl.PipelineParam(name='analyze-mode', value='local'),\n",
" analyze_slice_column=dsl.PipelineParam(name='analyze-slice-column', value='trip_start_hour')):\n",
Expand All @@ -732,7 +749,7 @@
" prediction = dataflow_tf_predict_op(evaluation, schema, target, training.output, predict_mode, project, prediction_output)\n",
" \n",
" # The new deployer. Note that the DeployerOp interface is similar to the function \"deploy_model\".\n",
" deploy = DeployerOp(gcp_project=project, model_dot_version=model, runtime='1.9', model_path=training.output)"
" deploy = DeployerOp(gcp_project=project, model_dot_version=model, runtime='1.9', model_path=training.output).apply(gcp.use_gcp_secret('user-gcp-sa'))"
]
},
{
Expand Down Expand Up @@ -766,7 +783,7 @@
"run = client.run_pipeline(exp.id, 'my-tfx', 'my-tfx.tar.gz',\n",
" params={'output': OUTPUT_DIR,\n",
" 'project': PROJECT_NAME,\n",
" 'model': 'mytaxi.beta'})"
" 'model': PROD_DEPLOYER_MODEL})"
]
},
{
Expand Down
26 changes: 17 additions & 9 deletions samples/notebooks/Lightweight Python components - basics.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,28 @@
"* To build a component with multiple output values, use the typing.NamedTuple type hint syntax: ```NamedTuple('MyFunctionOutputs', [('output_name_1', type), ('output_name_2', float)])```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"EXPERIMENT_NAME = 'lightweight python components'\n",
"KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.3-rc.2/kfp.tar.gz'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Install the SDK\n",
"!pip3 install https://storage.googleapis.com/ml-pipeline/release/0.1.3-rc.2/kfp.tar.gz --upgrade\n"
"# Install the SDK\n",
"!pip3 install $KFP_PACKAGE --upgrade\n"
]
},
{
Expand Down Expand Up @@ -236,13 +250,7 @@
"#Get or create an experiment and submit a pipeline run\n",
"import kfp\n",
"client = kfp.Client()\n",
"list_experiments_response = client.list_experiments()\n",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this code being removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test needs to know the experiment name in order to check the status for test results.

"experiments = list_experiments_response.experiments\n",
"if not experiments:\n",
" #The user does not have any experiments available. Creating a new one\n",
" experiment = client.create_experiment(pipeline_func.__name__ + ' experiment')\n",
"else:\n",
" experiment = experiments[-1] #Using the last experiment\n",
"experiment = client.create_experiment(EXPERIMENT_NAME)\n",
"\n",
"#Submit a pipeline run\n",
"run_name = pipeline_func.__name__ + ' run'\n",
Expand Down
8 changes: 6 additions & 2 deletions sdk/python/kfp/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,20 @@ def run_pipeline(self, experiment_id, job_name, pipeline_package_path, params={}
IPython.display.display(IPython.display.HTML(html))
return response.run

def list_runs(self, page_token='', page_size=10, sort_by=''):
def list_runs(self, page_token='', page_size=10, sort_by='', resource_reference_key_type=None, resource_reference_key_id=None):
"""List runs.
Args:
page_token: token for starting of the page.
page_size: size of the page.
sort_by: one of 'field_name', 'field_name des'. For example, 'name des'.
resource_reference_key: resource filtering key
Returns:
A response object including a list of experiments and next page token.
"""
response = self._run_api.list_runs(page_token=page_token, page_size=page_size, sort_by=sort_by)
if resource_reference_key_type is not None and resource_reference_key_id is not None:
response = self._run_api.list_runs(page_token=page_token, page_size=page_size, sort_by=sort_by, resource_reference_key_type=resource_reference_key_type, resource_reference_key_id=resource_reference_key_id)
else:
response = self._run_api.list_runs(page_token=page_token, page_size=page_size, sort_by=sort_by)
return response

def get_run(self, run_id):
Expand Down
3 changes: 3 additions & 0 deletions test/sample-test/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ RUN pip3 install junit-xml
RUN pip3 install kubernetes
RUN pip3 install minio
RUN pip3 install setuptools==40.5.0
RUN pip3 install papermill==0.16.1
RUN pip3 install ipykernel==5.1.0
RUN pip3 install google-api-python-client==1.7.0

#Needs test/sample-test and the files needed to run sdk/python/build.sh
COPY . /python/src/github.com/kubeflow/pipelines
Expand Down
Loading