Skip to content

Commit

Permalink
Enhance pipeline TFX taxi sample to support on-prem cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchihe committed Feb 15, 2019
1 parent f07d578 commit 7a47e60
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 23 deletions.
13 changes: 9 additions & 4 deletions backend/src/apiserver/config/sample_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
"file":"/samples/xgboost-spark/xgboost-training-cm.py.tar.gz"
},
{
"name":"[Sample] ML - TFX - Taxi Tip Prediction Model Trainer",
"description":"Example pipeline that does classification with model analysis based on a public tax cab BigQuery dataset. For source code, refer to https://github.com/kubeflow/pipelines/tree/master/samples/tfx",
"file":"/samples/tfx/taxi-cab-classification-pipeline.py.tar.gz"
"name":"[Sample] ML - TFX - Taxi Tip Prediction Model Trainer on GCP",
"description":"Example pipeline that does classification with model analysis based on a public tax cab BigQuery dataset on GCP. For source code, refer to https://github.com/kubeflow/pipelines/tree/master/samples/tfx",
"file":"/samples/tfx/taxi-cab-classification-pipeline-gcp.py.tar.gz"
},
{
"name":"[Sample] ML - TFX - Taxi Tip Prediction Model Trainer for on-prem cluster",
"description":"Example pipeline that does classification with model analysis based on a public tax cab BigQuery dataset for on-prem cluster. For source code, refer to https://github.com/kubeflow/pipelines/tree/master/samples/tfx",
"file":"/samples/tfx/taxi-cab-classification-pipeline-on-prem.py.tar.gz"
},
{
"name":"[Sample] Basic - Sequential",
Expand All @@ -34,4 +39,4 @@
"description":"A pipeline shows how to use dsl.Condition. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/condition.py",
"file":"/samples/basic/condition.py.tar.gz"
}
]
]
35 changes: 28 additions & 7 deletions samples/tfx/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
The `taxi-cab-classification-pipeline.py` sample runs a pipeline with TensorFlow's transform and model-analysis components.
The sample runs a pipeline with TensorFlow's transform and model-analysis components. The `taxi-cab-classification-pipeline-gcp.py` is for GCP and `taxi-cab-classification-pipeline-on-prem.py` is for on-prem cluster.

## The dataset

Expand All @@ -14,36 +14,57 @@ accuracy, timeliness, or completeness of any of the data provided at this site.
The data provided at this site is subject to change at any time. It is understood
that the data provided at this site is being used at one’s own risk.

[Read more](https://cloud.google.com/bigquery/public-data/chicago-taxi) about the
dataset in [Google BigQuery](https://cloud.google.com/bigquery/). Explore the
full dataset in the
[BigQuery UI](https://bigquery.cloud.google.com/dataset/bigquery-public-data:chicago_taxi_trips).
[Read more](https://cloud.google.com/bigquery/public-data/chicago-taxi) about the dataset in [Google BigQuery](https://cloud.google.com/bigquery/). Explore the full dataset in the [BigQuery UI](https://bigquery.cloud.google.com/dataset/bigquery-public-data:chicago_taxi_trips).


## Requirements

### GCP

Preprocessing and model analysis use [Apache Beam](https://beam.apache.org/).

When run with the `cloud` mode (instead of the `local` mode), those steps use [Google Cloud DataFlow](https://beam.apache.org/) for running the Beam pipelines.

Therefore, you must enable the DataFlow API for the given GCP project if you want to use `cloud` as the mode for either preprocessing or analysis. See the [guide to enabling the DataFlow API](https://cloud.google.com/endpoints/docs/openapi/enable-api).

### On-prem cluster

When run the on-prem clusters, follow the [document](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) to create the persistent volume and persistent volume claim to storage the intermediate data and result. Note that the `accessModes` should be `ReadWriteMany` so that the volume can be mounted as read-write by many nodes. For example, the `taxi-cab-classification-pipeline-on-prem.py` sample is associated with a persistent volume claim that's named `pipeline-pvc`.


## Compiling the pipeline template

Follow the guide to [building a pipeline](https://www.kubeflow.org/docs/guides/pipelines/build-pipeline/) to install the Kubeflow Pipelines SDK, then run the following command to compile the sample Python into a workflow specification. The specification takes the form of a YAML file compressed into a `.tar.gz` file.

### GCP
```bash
dsl-compile --py taxi-cab-classification-pipeline-gcp.py --output taxi-cab-classification-pipeline-gcp.tar.gz
```
### On-prem cluster
```bash
dsl-compile --py taxi-cab-classification-pipeline.py --output taxi-cab-classification-pipeline.tar.gz
dsl-compile --py taxi-cab-classification-pipeline-on-prem.py --output taxi-cab-classification-pipeline-on-prem.tar.gz
```

## Deploying the pipeline

Open the Kubeflow pipelines UI. Create a new pipeline, and then upload the compiled specification (`.tar.gz` file) as a new pipeline template.

### GCP

The pipeline requires two arguments:

1. The name of a GCP project.
2. An output directory in a Google Cloud Storage bucket, of the form `gs://<BUCKET>/<PATH>`.

### On-prem cluster

- Before deploying the pipeline, download the training and evaluation data [taxi-cab-classification](https://github.com/kubeflow/pipelines/tree/master/samples/tfx/taxi-cab-classification) from Github, and copy the directory to the persistent volume storage.

- Following the [guide](https://www.kubeflow.org/docs/pipelines/pipelines-ui/) to run an experiment and a run inside the experiment.

**Limitation**: The value of the pvc_name parameter must be consistent with the value as it is specified in the pipeline definition of the `taxi-cab-classification-pipeline-on-perm.py` file. See the [dsl PipelineParam does not work under Image or Command](https://github.com/kubeflow/pipelines/issues/521) issue for more information about this limitation.


## Components source

Preprocessing:
Expand All @@ -60,4 +81,4 @@ Analysis:

Prediction:
[source code](https://github.com/kubeflow/pipelines/tree/master/components/dataflow/predict/src)
[container](https://github.com/kubeflow/pipelines/tree/master/components/dataflow/predict)
[container](https://github.com/kubeflow/pipelines/tree/master/components/dataflow/predict)
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ def kubeflow_deploy_op(model: 'TensorFlow model', tf_server_name, step_name='dep


@dsl.pipeline(
name='TFX Taxi Cab Classification Pipeline Example',
description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.'
name='TFX Taxi Cab Classification Pipeline Example for GCP',
description='Example pipeline that does classification with model analysis based on a public BigQuery dataset for GCP.'
)
def taxi_cab_classification(
output,
Expand Down
200 changes: 200 additions & 0 deletions samples/tfx/taxi-cab-classification-pipeline-on-prem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
#!/usr/bin/env python3
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import kfp.dsl as dsl
from kubernetes import client as k8s_client


def dataflow_tf_data_validation_op(inference_data, validation_data,
column_names, key_columns, project, mode,
validation_output, step_name='validation'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:5df2cdc1ed145320204e8bc73b59cdbd7b3da28f',
arguments=[
'--csv-data-for-inference', inference_data,
'--csv-data-to-validate', validation_data,
'--column-names', column_names,
'--key-columns', key_columns,
'--project', project,
'--mode', mode,
'--output', '%s/{{workflow.name}}/validation' % validation_output,
],
file_outputs={
'schema': '/schema.txt',
'validation': '/output_validation_result.txt',
}
)


def dataflow_tf_transform_op(train_data, evaluation_data, schema,
project, preprocess_mode, preprocess_module,
transform_output, step_name='preprocess'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:5df2cdc1ed145320204e8bc73b59cdbd7b3da28f',
arguments=[
'--train', train_data,
'--eval', evaluation_data,
'--schema', schema,
'--project', project,
'--mode', preprocess_mode,
'--preprocessing-module', preprocess_module,
'--output', '%s/{{workflow.name}}/transformed' % transform_output,
],
file_outputs={'transformed': '/output.txt'}
)


def tf_train_op(transformed_data_dir, schema, learning_rate: float, hidden_layer_size: int,
steps: int, target: str, preprocess_module,
training_output, step_name='training'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:5df2cdc1ed145320204e8bc73b59cdbd7b3da28f',
arguments=[
'--transformed-data-dir', transformed_data_dir,
'--schema', schema,
'--learning-rate', learning_rate,
'--hidden-layer-size', hidden_layer_size,
'--steps', steps,
'--target', target,
'--preprocessing-module', preprocess_module,
'--job-dir', '%s/{{workflow.name}}/train' % training_output,
],
file_outputs={'train': '/output.txt'}
)


def dataflow_tf_model_analyze_op(model: 'TensorFlow model', evaluation_data, schema,
project, analyze_mode, analyze_slice_column, analysis_output,
step_name='analysis'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:5df2cdc1ed145320204e8bc73b59cdbd7b3da28f',
arguments=[
'--model', model,
'--eval', evaluation_data,
'--schema', schema,
'--project', project,
'--mode', analyze_mode,
'--slice-columns', analyze_slice_column,
'--output', '%s/{{workflow.name}}/analysis' % analysis_output,
],
file_outputs={'analysis': '/output.txt'}
)


def dataflow_tf_predict_op(evaluation_data, schema, target: str,
model: 'TensorFlow model', predict_mode, project, prediction_output,
step_name='prediction'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:5df2cdc1ed145320204e8bc73b59cdbd7b3da28f',
arguments=[
'--data', evaluation_data,
'--schema', schema,
'--target', target,
'--model', model,
'--mode', predict_mode,
'--project', project,
'--output', '%s/{{workflow.name}}/predict' % prediction_output,
],
file_outputs={'prediction': '/output.txt'}
)


def confusion_matrix_op(predictions, output, step_name='confusion_matrix'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:5df2cdc1ed145320204e8bc73b59cdbd7b3da28f',
arguments=[
'--output', '%s/{{workflow.name}}/confusionmatrix' % output,
'--predictions', predictions,
'--target_lambda', """lambda x: (x['target'] > x['fare'] * 0.2)""",
])


def roc_op(predictions, output, step_name='roc'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-local-roc:5df2cdc1ed145320204e8bc73b59cdbd7b3da28f',
arguments=[
'--output', '%s/{{workflow.name}}/roc' % output,
'--predictions', predictions,
'--target_lambda', """lambda x: 1 if (x['target'] > x['fare'] * 0.2) else 0""",
])


def kubeflow_deploy_op(model: 'TensorFlow model', tf_server_name, pvc_name, step_name='deploy'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:5df2cdc1ed145320204e8bc73b59cdbd7b3da28f',
arguments=[
'--cluster-name', 'tfx-taxi-pipeline-on-prem',
'--model-path', model,
'--server-name', tf_server_name,
'--pvc-name', pvc_name,
]
)


@dsl.pipeline(
name='TFX Taxi Cab Classification Pipeline Example for on-prem cluster',
description='Example pipeline that does classification with model analysis based on a public BigQuery dataset for on-prem cluster.'
)
def taxi_cab_classification(
pvc_name='pipeline-pvc',
project='tfx-taxi-pipeline-on-prem',
column_names='taxi-cab-classification/column-names.json',
key_columns='trip_start_timestamp',
train='taxi-cab-classification/train.csv',
evaluation='taxi-cab-classification/eval.csv',
mode='local',
preprocess_module='taxi-cab-classification/preprocessing.py',
learning_rate=0.1,
hidden_layer_size=1500,
steps=3000,
analyze_slice_column='trip_start_hour'):
tf_server_name = 'taxi-cab-classification-model-{{workflow.name}}'
validation = dataflow_tf_data_validation_op('/mnt/%s' % train, '/mnt/%s' % evaluation, '/mnt/%s' % column_names,
key_columns, project, mode, '/mnt').add_volume(
k8s_client.V1Volume(name='pipeline-nfs', persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(
claim_name='pipeline-pvc'))).add_volume_mount(k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs'))
preprocess = dataflow_tf_transform_op('/mnt/%s' % train, '/mnt/%s' % evaluation, validation.outputs['schema'],
project, mode, '/mnt/%s' % preprocess_module, '/mnt').add_volume_mount(
k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs'))
training = tf_train_op(preprocess.output, validation.outputs['schema'], learning_rate, hidden_layer_size, steps,
'tips', '/mnt/%s' % preprocess_module, '/mnt').add_volume_mount(
k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs'))
analysis = dataflow_tf_model_analyze_op(training.output, '/mnt/%s' % evaluation, validation.outputs['schema'],
project, mode, analyze_slice_column, '/mnt').add_volume_mount(
k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs'))
prediction = dataflow_tf_predict_op('/mnt/%s' % evaluation, validation.outputs['schema'], 'tips', training.output,
mode, project, '/mnt').add_volume_mount(
k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs'))
cm = confusion_matrix_op(prediction.output, '/mnt').add_volume_mount(
k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs'))
roc = roc_op(prediction.output, '/mnt').add_volume_mount(
k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs'))
deploy = kubeflow_deploy_op(training.output, tf_server_name, pvc_name).add_volume_mount(
k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs'))


if __name__ == '__main__':
import kfp.compiler as compiler

compiler.Compiler().compile(taxi_cab_classification, __file__ + '.tar.gz')
22 changes: 12 additions & 10 deletions test/sample-test/run_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -168,25 +168,27 @@ elif [ "$TEST_NAME" == "tfx" ]; then
cd ${BASE_DIR}/samples/tfx

if [ -n "${DATAFLOW_TFT_IMAGE}" ];then
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFT_IMAGE}|g" taxi-cab-classification-pipeline.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_PREDICT_IMAGE}|g" taxi-cab-classification-pipeline.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFDV_IMAGE}|g" taxi-cab-classification-pipeline.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFMA_IMAGE}|g" taxi-cab-classification-pipeline.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:\([a-zA-Z0-9_.-]\)\+|${KUBEFLOW_DNNTRAINER_IMAGE}|g" taxi-cab-classification-pipeline.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:\([a-zA-Z0-9_.-]\)\+|${KUBEFLOW_DEPLOYER_IMAGE}|g" taxi-cab-classification-pipeline.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:\([a-zA-Z0-9_.-]\)\+|${LOCAL_CONFUSIONMATRIX_IMAGE}|g" taxi-cab-classification-pipeline.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-roc:\([a-zA-Z0-9_.-]\)\+|${LOCAL_ROC_IMAGE}|g" taxi-cab-classification-pipeline.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFT_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_PREDICT_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFDV_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFMA_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:\([a-zA-Z0-9_.-]\)\+|${KUBEFLOW_DNNTRAINER_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:\([a-zA-Z0-9_.-]\)\+|${KUBEFLOW_DEPLOYER_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:\([a-zA-Z0-9_.-]\)\+|${LOCAL_CONFUSIONMATRIX_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-roc:\([a-zA-Z0-9_.-]\)\+|${LOCAL_ROC_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py
fi

dsl-compile --py taxi-cab-classification-pipeline.py --output taxi-cab-classification-pipeline.tar.gz
dsl-compile --py taxi-cab-classification-pipeline-gcp.py --output taxi-cab-classification-pipeline-gcp.tar.gz
cd "${TEST_DIR}"
python3 run_tfx_test.py --input ${BASE_DIR}/samples/tfx/taxi-cab-classification-pipeline.tar.gz --result $SAMPLE_TFX_TEST_RESULT --output $SAMPLE_TFX_TEST_OUTPUT --namespace ${NAMESPACE}
python3 run_tfx_test.py --input ${BASE_DIR}/samples/tfx/taxi-cab-classification-pipeline-gcp.tar.gz --result $SAMPLE_TFX_TEST_RESULT --output $SAMPLE_TFX_TEST_OUTPUT --namespace ${NAMESPACE}
echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/"
gsutil cp ${SAMPLE_TFX_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_TFX_TEST_RESULT}
elif [ "$TEST_NAME" == "sequential" ]; then
SAMPLE_SEQUENTIAL_TEST_RESULT=junit_SampleSequentialOutput.xml
SAMPLE_SEQUENTIAL_TEST_OUTPUT=${RESULTS_GCS_DIR}

# TODO: Create testing for sample TFX taxi on-prem cluster.

# Compile samples
cd ${BASE_DIR}/samples/basic
dsl-compile --py sequential.py --output sequential.tar.gz
Expand Down

0 comments on commit 7a47e60

Please sign in to comment.