From e86d246bdda04e0bc5f142f109d4fe729608034c Mon Sep 17 00:00:00 2001 From: Jin Chi He Date: Wed, 30 Jan 2019 01:44:53 +0800 Subject: [PATCH] Enhance pipeline TFX taxi sample to support on-prem cluster --- samples/tfx/README.md | 30 +++++++- .../tfx/taxi-cab-classification-pipeline.py | 74 +++++++++++++++---- 2 files changed, 86 insertions(+), 18 deletions(-) diff --git a/samples/tfx/README.md b/samples/tfx/README.md index 5ec149476cc9..b7f65a492cd6 100644 --- a/samples/tfx/README.md +++ b/samples/tfx/README.md @@ -27,9 +27,18 @@ When run with the `cloud` mode (instead of the `local` mode), those steps use [G Therefore, you must enable the DataFlow API for the given GCP project if you want to use `cloud` as the mode for either preprocessing or analysis. See the [guide to enabling the DataFlow API](https://cloud.google.com/endpoints/docs/openapi/enable-api). +For On-Premise cluster, you need to create a [Persistent Volume (PV)](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) if the [Dynamic Volume Provisioning](https://kubernetes.io/docs/concepts/storage/dynamic-provisioning/) is not enabled. The capacity of the PV needs at least 1Gi. + ## Compiling the pipeline template -Follow the guide to [building a pipeline](https://www.kubeflow.org/docs/guides/pipelines/build-pipeline/) to install the Kubeflow Pipelines SDK, then run the following command to compile the sample Python into a workflow specification. The specification takes the form of a YAML file compressed into a `.tar.gz` file. +Follow the guide to [building a pipeline](https://www.kubeflow.org/docs/guides/pipelines/build-pipeline/) to install the Kubeflow Pipelines SDK. + +For On-Premise cluster, update the `platform` to `onprem` in `taxi-cab-classification-pipeline.py`. + +```bash +sed -i.sedbak s"/platform = 'GCP'/platform = 'onprem'/" taxi-cab-classification-pipeline.py +``` +Then run the following command to compile the sample Python into a workflow specification. The specification takes the form of a YAML file compressed into a `.tar.gz` file. ```bash dsl-compile --py taxi-cab-classification-pipeline.py --output taxi-cab-classification-pipeline.tar.gz @@ -39,10 +48,23 @@ dsl-compile --py taxi-cab-classification-pipeline.py --output taxi-cab-classific Open the Kubeflow pipelines UI. Create a new pipeline, and then upload the compiled specification (`.tar.gz` file) as a new pipeline template. -The pipeline requires two arguments: +- GCP + The pipeline requires two arguments: + + 1. The name of a GCP project. + 2. An output directory in a Google Cloud Storage bucket, of the form `gs:///`. +- On-Premise + For On-Premise cluster, the pipeline will create a Persistent Volume Claim (PVC), and download automatically the [source date](https://github.com/kubeflow/pipelines/tree/master/samples/tfx/taxi-cab-classification) to the PVC. + 1. The `output` is PVC mount point for the containers, can be set to `/mnt`. + 2. The `project` can be set to `taxi-cab-classification-pipeline-onprem`. + 3. If the PVC mounted to `/mnt`, the value of below parameters need to be set as following: + - `column-names`: ` +/mnt/pipelines/samples/tfx/taxi-cab-classification/column-names.json` + - `train`: `/mnt/pipelines/samples/tfx/taxi-cab-classification/train.csv` + - `evaluation`: `/mnt/pipelines/samples/tfx/taxi-cab-classification/eval.csv` + - `preprocess-module`: `/mnt/pipelines/samples/tfx/taxi-cab-classification/preprocessing.py` + -1. The name of a GCP project. -2. An output directory in a Google Cloud Storage bucket, of the form `gs:///`. ## Components source diff --git a/samples/tfx/taxi-cab-classification-pipeline.py b/samples/tfx/taxi-cab-classification-pipeline.py index 700d61cc96fb..03f578e54fad 100755 --- a/samples/tfx/taxi-cab-classification-pipeline.py +++ b/samples/tfx/taxi-cab-classification-pipeline.py @@ -18,7 +18,9 @@ from kfp import components from kfp import dsl from kfp import gcp +from kfp import onprem +platform = 'GCP' dataflow_tf_data_validation_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/74d8e592174ae90175f66c3c00ba76a835cfba6d/components/dataflow/tfdv/component.yaml') dataflow_tf_transform_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/74d8e592174ae90175f66c3c00ba76a835cfba6d/components/dataflow/tft/component.yaml') @@ -31,7 +33,6 @@ kubeflow_deploy_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/74d8e592174ae90175f66c3c00ba76a835cfba6d/components/kubeflow/deployer/component.yaml') - @dsl.pipeline( name='TFX Taxi Cab Classification Pipeline Example', description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.' @@ -56,6 +57,21 @@ def taxi_cab_classification( tf_server_name = 'taxi-cab-classification-model-{{workflow.uid}}' + if platform != 'GCP': + vop = dsl.VolumeOp( + name="create_pvc", + resource_name="pipeline-pvc", + modes=dsl.VOLUME_MODE_RWM, + size="1Gi" + ) + + checkout = dsl.ContainerOp( + name="checkout", + image="alpine/git:latest", + command=["git", "clone", "https://github.com/kubeflow/pipelines.git", str(output) + "/pipelines"], + ).apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output)) + checkout.after(vop) + validation = dataflow_tf_data_validation_op( inference_data=train, validation_data=evaluation, @@ -63,8 +79,10 @@ def taxi_cab_classification( key_columns=key_columns, gcp_project=project, run_mode=mode, - validation_output=output_template - ).apply(gcp.use_gcp_secret('user-gcp-sa')) + validation_output=output_template, + ) + if platform != 'GCP': + validation.after(checkout) preprocess = dataflow_tf_transform_op( training_data_file_pattern=train, @@ -74,7 +92,7 @@ def taxi_cab_classification( run_mode=mode, preprocessing_module=preprocess_module, transformed_data_dir=output_template - ).apply(gcp.use_gcp_secret('user-gcp-sa')) + ) training = tf_train_op( transformed_data_dir=preprocess.output, @@ -85,7 +103,7 @@ def taxi_cab_classification( target='tips', preprocessing_module=preprocess_module, training_output_dir=output_template - ).apply(gcp.use_gcp_secret('user-gcp-sa')) + ) analysis = dataflow_tf_model_analyze_op( model=training.output, @@ -95,7 +113,7 @@ def taxi_cab_classification( run_mode=mode, slice_columns=analyze_slice_column, analysis_results_dir=output_template - ).apply(gcp.use_gcp_secret('user-gcp-sa')) + ) prediction = dataflow_tf_predict_op( data_file_pattern=evaluation, @@ -105,24 +123,52 @@ def taxi_cab_classification( run_mode=mode, gcp_project=project, predictions_dir=output_template - ).apply(gcp.use_gcp_secret('user-gcp-sa')) + ) cm = confusion_matrix_op( predictions=prediction.output, target_lambda=target_lambda, output_dir=output_template - ).apply(gcp.use_gcp_secret('user-gcp-sa')) + ) roc = roc_op( predictions_dir=prediction.output, target_lambda=target_class_lambda, output_dir=output_template - ).apply(gcp.use_gcp_secret('user-gcp-sa')) - - deploy = kubeflow_deploy_op( - model_dir=str(training.output) + '/export/export', - server_name=tf_server_name - ).apply(gcp.use_gcp_secret('user-gcp-sa')) + ) + + if platform == 'GCP': + deploy = kubeflow_deploy_op( + model_dir=str(training.output) + '/export/export', + server_name=tf_server_name + ) + else: + deploy = kubeflow_deploy_op( + cluster_name=project, + model_dir=str(training.output) + '/export/export', + pvc_name=vop.outputs["name"], + server_name=tf_server_name + ) + + # TBD (jinchihe) Need to enhance once the PR merged: kubeflow/pipelines/pull/1209. + if platform == 'GCP': + validation.apply(gcp.use_gcp_secret('user-gcp-sa')) + preprocess.apply(gcp.use_gcp_secret('user-gcp-sa')) + training.apply(gcp.use_gcp_secret('user-gcp-sa')) + analysis.apply(gcp.use_gcp_secret('user-gcp-sa')) + prediction.apply(gcp.use_gcp_secret('user-gcp-sa')) + cm.apply(gcp.use_gcp_secret('user-gcp-sa')) + roc.apply(gcp.use_gcp_secret('user-gcp-sa')) + deploy.apply(gcp.use_gcp_secret('user-gcp-sa')) + else: + validation.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output)) + preprocess.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output)) + training.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output)) + analysis.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output)) + prediction.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output)) + cm.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output)) + roc.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output)) + deploy.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output)) if __name__ == '__main__':