Skip to content

Commit

Permalink
Enhance pipeline TFX taxi sample to support on-prem cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchihe committed May 21, 2019
1 parent ea07b33 commit e86d246
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 18 deletions.
30 changes: 26 additions & 4 deletions samples/tfx/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,18 @@ When run with the `cloud` mode (instead of the `local` mode), those steps use [G

Therefore, you must enable the DataFlow API for the given GCP project if you want to use `cloud` as the mode for either preprocessing or analysis. See the [guide to enabling the DataFlow API](https://cloud.google.com/endpoints/docs/openapi/enable-api).

For On-Premise cluster, you need to create a [Persistent Volume (PV)](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) if the [Dynamic Volume Provisioning](https://kubernetes.io/docs/concepts/storage/dynamic-provisioning/) is not enabled. The capacity of the PV needs at least 1Gi.

## Compiling the pipeline template

Follow the guide to [building a pipeline](https://www.kubeflow.org/docs/guides/pipelines/build-pipeline/) to install the Kubeflow Pipelines SDK, then run the following command to compile the sample Python into a workflow specification. The specification takes the form of a YAML file compressed into a `.tar.gz` file.
Follow the guide to [building a pipeline](https://www.kubeflow.org/docs/guides/pipelines/build-pipeline/) to install the Kubeflow Pipelines SDK.

For On-Premise cluster, update the `platform` to `onprem` in `taxi-cab-classification-pipeline.py`.

```bash
sed -i.sedbak s"/platform = 'GCP'/platform = 'onprem'/" taxi-cab-classification-pipeline.py
```
Then run the following command to compile the sample Python into a workflow specification. The specification takes the form of a YAML file compressed into a `.tar.gz` file.

```bash
dsl-compile --py taxi-cab-classification-pipeline.py --output taxi-cab-classification-pipeline.tar.gz
Expand All @@ -39,10 +48,23 @@ dsl-compile --py taxi-cab-classification-pipeline.py --output taxi-cab-classific

Open the Kubeflow pipelines UI. Create a new pipeline, and then upload the compiled specification (`.tar.gz` file) as a new pipeline template.

The pipeline requires two arguments:
- GCP
The pipeline requires two arguments:

1. The name of a GCP project.
2. An output directory in a Google Cloud Storage bucket, of the form `gs://<BUCKET>/<PATH>`.
- On-Premise
For On-Premise cluster, the pipeline will create a Persistent Volume Claim (PVC), and download automatically the [source date](https://github.com/kubeflow/pipelines/tree/master/samples/tfx/taxi-cab-classification) to the PVC.
1. The `output` is PVC mount point for the containers, can be set to `/mnt`.
2. The `project` can be set to `taxi-cab-classification-pipeline-onprem`.
3. If the PVC mounted to `/mnt`, the value of below parameters need to be set as following:
- `column-names`: `
/mnt/pipelines/samples/tfx/taxi-cab-classification/column-names.json`
- `train`: `/mnt/pipelines/samples/tfx/taxi-cab-classification/train.csv`
- `evaluation`: `/mnt/pipelines/samples/tfx/taxi-cab-classification/eval.csv`
- `preprocess-module`: `/mnt/pipelines/samples/tfx/taxi-cab-classification/preprocessing.py`


1. The name of a GCP project.
2. An output directory in a Google Cloud Storage bucket, of the form `gs://<BUCKET>/<PATH>`.

## Components source

Expand Down
74 changes: 60 additions & 14 deletions samples/tfx/taxi-cab-classification-pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
from kfp import components
from kfp import dsl
from kfp import gcp
from kfp import onprem

platform = 'GCP'

dataflow_tf_data_validation_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/74d8e592174ae90175f66c3c00ba76a835cfba6d/components/dataflow/tfdv/component.yaml')
dataflow_tf_transform_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/74d8e592174ae90175f66c3c00ba76a835cfba6d/components/dataflow/tft/component.yaml')
Expand All @@ -31,7 +33,6 @@

kubeflow_deploy_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/74d8e592174ae90175f66c3c00ba76a835cfba6d/components/kubeflow/deployer/component.yaml')


@dsl.pipeline(
name='TFX Taxi Cab Classification Pipeline Example',
description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.'
Expand All @@ -56,15 +57,32 @@ def taxi_cab_classification(

tf_server_name = 'taxi-cab-classification-model-{{workflow.uid}}'

if platform != 'GCP':
vop = dsl.VolumeOp(
name="create_pvc",
resource_name="pipeline-pvc",
modes=dsl.VOLUME_MODE_RWM,
size="1Gi"
)

checkout = dsl.ContainerOp(
name="checkout",
image="alpine/git:latest",
command=["git", "clone", "https://github.com/kubeflow/pipelines.git", str(output) + "/pipelines"],
).apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))
checkout.after(vop)

validation = dataflow_tf_data_validation_op(
inference_data=train,
validation_data=evaluation,
column_names=column_names,
key_columns=key_columns,
gcp_project=project,
run_mode=mode,
validation_output=output_template
).apply(gcp.use_gcp_secret('user-gcp-sa'))
validation_output=output_template,
)
if platform != 'GCP':
validation.after(checkout)

preprocess = dataflow_tf_transform_op(
training_data_file_pattern=train,
Expand All @@ -74,7 +92,7 @@ def taxi_cab_classification(
run_mode=mode,
preprocessing_module=preprocess_module,
transformed_data_dir=output_template
).apply(gcp.use_gcp_secret('user-gcp-sa'))
)

training = tf_train_op(
transformed_data_dir=preprocess.output,
Expand All @@ -85,7 +103,7 @@ def taxi_cab_classification(
target='tips',
preprocessing_module=preprocess_module,
training_output_dir=output_template
).apply(gcp.use_gcp_secret('user-gcp-sa'))
)

analysis = dataflow_tf_model_analyze_op(
model=training.output,
Expand All @@ -95,7 +113,7 @@ def taxi_cab_classification(
run_mode=mode,
slice_columns=analyze_slice_column,
analysis_results_dir=output_template
).apply(gcp.use_gcp_secret('user-gcp-sa'))
)

prediction = dataflow_tf_predict_op(
data_file_pattern=evaluation,
Expand All @@ -105,24 +123,52 @@ def taxi_cab_classification(
run_mode=mode,
gcp_project=project,
predictions_dir=output_template
).apply(gcp.use_gcp_secret('user-gcp-sa'))
)

cm = confusion_matrix_op(
predictions=prediction.output,
target_lambda=target_lambda,
output_dir=output_template
).apply(gcp.use_gcp_secret('user-gcp-sa'))
)

roc = roc_op(
predictions_dir=prediction.output,
target_lambda=target_class_lambda,
output_dir=output_template
).apply(gcp.use_gcp_secret('user-gcp-sa'))

deploy = kubeflow_deploy_op(
model_dir=str(training.output) + '/export/export',
server_name=tf_server_name
).apply(gcp.use_gcp_secret('user-gcp-sa'))
)

if platform == 'GCP':
deploy = kubeflow_deploy_op(
model_dir=str(training.output) + '/export/export',
server_name=tf_server_name
)
else:
deploy = kubeflow_deploy_op(
cluster_name=project,
model_dir=str(training.output) + '/export/export',
pvc_name=vop.outputs["name"],
server_name=tf_server_name
)

# TBD (jinchihe) Need to enhance once the PR merged: kubeflow/pipelines/pull/1209.
if platform == 'GCP':
validation.apply(gcp.use_gcp_secret('user-gcp-sa'))
preprocess.apply(gcp.use_gcp_secret('user-gcp-sa'))
training.apply(gcp.use_gcp_secret('user-gcp-sa'))
analysis.apply(gcp.use_gcp_secret('user-gcp-sa'))
prediction.apply(gcp.use_gcp_secret('user-gcp-sa'))
cm.apply(gcp.use_gcp_secret('user-gcp-sa'))
roc.apply(gcp.use_gcp_secret('user-gcp-sa'))
deploy.apply(gcp.use_gcp_secret('user-gcp-sa'))
else:
validation.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))
preprocess.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))
training.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))
analysis.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))
prediction.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))
cm.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))
roc.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))
deploy.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))


if __name__ == '__main__':
Expand Down

0 comments on commit e86d246

Please sign in to comment.