Skip to content

Commit

Permalink
Enhance pipeline TFX taxi sample to support on-prem cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchihe committed Jan 31, 2019
1 parent 752256c commit 6b33f16
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 24 deletions.
13 changes: 9 additions & 4 deletions backend/src/apiserver/config/sample_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
"file":"/samples/xgboost-spark/xgboost-training-cm.py.tar.gz"
},
{
"name":"[Sample] ML - TFX - Taxi Tip Prediction Model Trainer",
"description":"Example pipeline that does classification with model analysis based on a public tax cab BigQuery dataset. For source code, refer to https://github.com/kubeflow/pipelines/tree/master/samples/tfx",
"file":"/samples/tfx/taxi-cab-classification-pipeline.py.tar.gz"
"name":"[Sample] ML - TFX - Taxi Tip Prediction Model Trainer on GCP",
"description":"Example pipeline that does classification with model analysis based on a public tax cab BigQuery dataset on GCP. For source code, refer to https://github.com/kubeflow/pipelines/tree/master/samples/tfx",
"file":"/samples/tfx/taxi-cab-classification-pipeline-gcp.py.tar.gz"
},
{
"name":"[Sample] ML - TFX - Taxi Tip Prediction Model Trainer for on-prem cluster",
"description":"Example pipeline that does classification with model analysis based on a public tax cab BigQuery dataset for on-prem cluster. For source code, refer to https://github.com/kubeflow/pipelines/tree/master/samples/tfx",
"file":"/samples/tfx/taxi-cab-classification-pipeline-on-prem.py.tar.gz"
},
{
"name":"[Sample] Basic - Sequential",
Expand All @@ -34,4 +39,4 @@
"description":"A pipeline shows how to use dsl.Condition. For source code, refer to https://github.com/kubeflow/pipelines/blob/master/samples/basic/condition.py",
"file":"/samples/basic/condition.py.tar.gz"
}
]
]
45 changes: 35 additions & 10 deletions samples/tfx/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
The `taxi-cab-classification-pipeline.py` sample runs a pipeline with TensorFlow's transform and model-analysis components.
The sample runs a pipeline with TensorFlow's transform and model-analysis components. The `taxi-cab-classification-pipeline-gcp.py` is for GCP and `taxi-cab-classification-pipeline-on-prem.py` is for on-prem cluster.

## The dataset

Expand All @@ -19,30 +19,54 @@ dataset in [Google BigQuery](https://cloud.google.com/bigquery/). Explore the
full dataset in the
[BigQuery UI](https://bigquery.cloud.google.com/dataset/bigquery-public-data:chicago_taxi_trips).


## Requirements

Preprocessing and model analysis use [Apache Beam](https://beam.apache.org/).
### Using GCP

Preprocessing and model analysis use [Apache Beam](https://beam.apache.org/).

When run with the `cloud` mode (instead of the `local` mode), those steps use [Google Cloud DataFlow](https://beam.apache.org/) for running the Beam pipelines.

When run with the `cloud` mode (instead of the `local` mode), those steps use [Google Cloud DataFlow](https://beam.apache.org/) for running the Beam pipelines.
Therefore, you must enable the DataFlow API for the given GCP project if you want to use `cloud` as the mode for either preprocessing or analysis. See the [guide to enabling the DataFlow API](https://cloud.google.com/endpoints/docs/openapi/enable-api).

### On-prem cluster

When run the on-prem clusters, follow the [document](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) to create the persistent volume and persistent volume claim to storage the intermediate data and result. Note that the `accessModes` should be `ReadWriteMany` so that the volume can be mounted as read-write by many nodes. For example, the `taxi-cab-classification-pipeline-on-prem.py` sample is associated with a persistent volume claim that's named `pipeline-pvc`.

Therefore, you must enable the DataFlow API for the given GCP project if you want to use `cloud` as the mode for either preprocessing or analysis. See the [guide to enabling the DataFlow API](https://cloud.google.com/endpoints/docs/openapi/enable-api).

## Compiling the pipeline template

Follow the guide to [building a pipeline](https://www.kubeflow.org/docs/guides/pipelines/build-pipeline/) to install the Kubeflow Pipelines SDK, then run the following command to compile the sample Python into a workflow specification. The specification takes the form of a YAML file compressed into a `.tar.gz` file.

```bash
dsl-compile --py taxi-cab-classification-pipeline.py --output taxi-cab-classification-pipeline.tar.gz
```
### GCP
```bash
dsl-compile --py taxi-cab-classification-pipeline-gcp.py --output taxi-cab-classification-pipeline-gcp.tar.gz
```
### On-prem cluster
```bash
dsl-compile --py taxi-cab-classification-pipeline-on-prem.py --output taxi-cab-classification-pipeline-on-prem.tar.gz
```

## Deploying the pipeline

Open the Kubeflow pipelines UI. Create a new pipeline, and then upload the compiled specification (`.tar.gz` file) as a new pipeline template.

The pipeline requires two arguments:
### GCP

The pipeline requires two arguments:

1. The name of a GCP project.
2. An output directory in a Google Cloud Storage bucket, of the form `gs://<BUCKET>/<PATH>`.

### On-prem cluster

- Before deploying the pipeline, download the training and evaluation data [taxi-cab-classification](https://github.com/kubeflow/pipelines/tree/master/samples/tfx/taxi-cab-classification) from Github, and copy the directory to the persistent volume storage.

- Following the [guide](https://www.kubeflow.org/docs/pipelines/pipelines-ui/) to run an experiment and a run inside the experiment.

**Limitation**: The value of the pvc_name parameter must be consistent with the value as it is specified in the pipeline definition of the `taxi-cab-classification-pipeline-on-perm.py` file. See the [dsl PipelineParam does not work under Image or Command](https://github.com/kubeflow/pipelines/issues/521) issue for more information about this limitation.

1. The name of a GCP project.
2. An output directory in a Google Cloud Storage bucket, of the form `gs://<BUCKET>/<PATH>`.

## Components source

Expand All @@ -61,3 +85,4 @@ Analysis:
Prediction:
[source code](https://github.com/kubeflow/pipelines/tree/master/components/dataflow/predict/src)
[container](https://github.com/kubeflow/pipelines/tree/master/components/dataflow/predict)

201 changes: 201 additions & 0 deletions samples/tfx/taxi-cab-classification-pipeline-on-prem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
#!/usr/bin/env python3
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import kfp.dsl as dsl
from kubernetes import client as k8s_client


def dataflow_tf_data_validation_op(inference_data, validation_data,
column_names, key_columns, project, mode,
validation_output, step_name='validation'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:be19cbc2591a48d2ef5ca715c34ecae8223cf454',
arguments=[
'--csv-data-for-inference', inference_data,
'--csv-data-to-validate', validation_data,
'--column-names', column_names,
'--key-columns', key_columns,
'--project', project,
'--mode', mode,
'--output', '%s/{{workflow.name}}/validation' % validation_output,
],
file_outputs={
'schema': '/schema.txt',
'validation': '/output_validation_result.txt',
}
)


def dataflow_tf_transform_op(train_data, evaluation_data, schema,
project, preprocess_mode, preprocess_module,
transform_output, step_name='preprocess'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:be19cbc2591a48d2ef5ca715c34ecae8223cf454',
arguments=[
'--train', train_data,
'--eval', evaluation_data,
'--schema', schema,
'--project', project,
'--mode', preprocess_mode,
'--preprocessing-module', preprocess_module,
'--output', '%s/{{workflow.name}}/transformed' % transform_output,
],
file_outputs={'transformed': '/output.txt'}
)


def tf_train_op(transformed_data_dir, schema, learning_rate: float, hidden_layer_size: int,
steps: int, target: str, preprocess_module,
training_output, step_name='training'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:be19cbc2591a48d2ef5ca715c34ecae8223cf454',
arguments=[
'--transformed-data-dir', transformed_data_dir,
'--schema', schema,
'--learning-rate', learning_rate,
'--hidden-layer-size', hidden_layer_size,
'--steps', steps,
'--target', target,
'--preprocessing-module', preprocess_module,
'--job-dir', '%s/{{workflow.name}}/train' % training_output,
],
file_outputs={'train': '/output.txt'}
)


def dataflow_tf_model_analyze_op(model: 'TensorFlow model', evaluation_data, schema,
project, analyze_mode, analyze_slice_column, analysis_output,
step_name='analysis'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:be19cbc2591a48d2ef5ca715c34ecae8223cf454',
arguments=[
'--model', model,
'--eval', evaluation_data,
'--schema', schema,
'--project', project,
'--mode', analyze_mode,
'--slice-columns', analyze_slice_column,
'--output', '%s/{{workflow.name}}/analysis' % analysis_output,
],
file_outputs={'analysis': '/output.txt'}
)


def dataflow_tf_predict_op(evaluation_data, schema, target: str,
model: 'TensorFlow model', predict_mode, project, prediction_output,
step_name='prediction'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:be19cbc2591a48d2ef5ca715c34ecae8223cf454',
arguments=[
'--data', evaluation_data,
'--schema', schema,
'--target', target,
'--model', model,
'--mode', predict_mode,
'--project', project,
'--output', '%s/{{workflow.name}}/predict' % prediction_output,
],
file_outputs={'prediction': '/output.txt'}
)


def confusion_matrix_op(predictions, output, step_name='confusion_matrix'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:be19cbc2591a48d2ef5ca715c34ecae8223cf454',
arguments=[
'--output', '%s/{{workflow.name}}/confusionmatrix' % output,
'--predictions', predictions,
'--target_lambda', """lambda x: (x['target'] > x['fare'] * 0.2)""",
])


def roc_op(predictions, output, step_name='roc'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-local-roc:be19cbc2591a48d2ef5ca715c34ecae8223cf454',
arguments=[
'--output', '%s/{{workflow.name}}/roc' % output,
'--predictions', predictions,
'--target_lambda', """lambda x: 1 if (x['target'] > x['fare'] * 0.2) else 0""",
])


def kubeflow_deploy_op(model: 'TensorFlow model', tf_server_name, pvc_name, step_name='deploy'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:be19cbc2591a48d2ef5ca715c34ecae8223cf454',
arguments=[
'--cluster-name', 'tfx-taxi-pipeline-on-prem',
'--model-path', model,
'--server-name', tf_server_name,
'--model-storage-type', 'nfs',
'--pvc-name', pvc_name,
]
)


@dsl.pipeline(
name='TFX Taxi Cab Classification Pipeline Example',
description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.'
)
def taxi_cab_classification(
pvc_name='pipeline-pvc',
project='tfx-taxi-pipeline-on-prem',
column_names='taxi-cab-classification/column-names.json',
key_columns='trip_start_timestamp',
train='taxi-cab-classification/train.csv',
evaluation='taxi-cab-classification/eval.csv',
mode='local',
preprocess_module='taxi-cab-classification/preprocessing.py',
learning_rate=0.1,
hidden_layer_size=1500,
steps=3000,
analyze_slice_column='trip_start_hour'):
tf_server_name = 'taxi-cab-classification-model-{{workflow.name}}'
validation = dataflow_tf_data_validation_op('/mnt/%s' % train, '/mnt/%s' % evaluation, '/mnt/%s' % column_names,
key_columns, project, mode, '/mnt').add_volume(
k8s_client.V1Volume(name='pipeline-nfs', persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(
claim_name='pipeline-pvc'))).add_volume_mount(k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs'))
preprocess = dataflow_tf_transform_op('/mnt/%s' % train, '/mnt/%s' % evaluation, validation.outputs['schema'],
project, mode, '/mnt/%s' % preprocess_module, '/mnt').add_volume_mount(
k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs'))
training = tf_train_op(preprocess.output, validation.outputs['schema'], learning_rate, hidden_layer_size, steps,
'tips', '/mnt/%s' % preprocess_module, '/mnt').add_volume_mount(
k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs'))
analysis = dataflow_tf_model_analyze_op(training.output, '/mnt/%s' % evaluation, validation.outputs['schema'],
project, mode, analyze_slice_column, '/mnt').add_volume_mount(
k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs'))
prediction = dataflow_tf_predict_op('/mnt/%s' % evaluation, validation.outputs['schema'], 'tips', training.output,
mode, project, '/mnt').add_volume_mount(
k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs'))
cm = confusion_matrix_op(prediction.output, '/mnt').add_volume_mount(
k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs'))
roc = roc_op(prediction.output, '/mnt').add_volume_mount(
k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs'))
deploy = kubeflow_deploy_op(training.output, tf_server_name, pvc_name).add_volume_mount(
k8s_client.V1VolumeMount(mount_path='/mnt', name='pipeline-nfs'))


if __name__ == '__main__':
import kfp.compiler as compiler

compiler.Compiler().compile(taxi_cab_classification, __file__ + '.tar.gz')
22 changes: 12 additions & 10 deletions test/sample-test/run_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -169,25 +169,27 @@ elif [ "$TEST_NAME" == "tfx" ]; then
cd ${BASE_DIR}/samples/tfx

if [ -n "${DATAFLOW_TFT_IMAGE}" ];then
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFT_IMAGE}|g" taxi-cab-classification-pipeline.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_PREDICT_IMAGE}|g" taxi-cab-classification-pipeline.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFDV_IMAGE}|g" taxi-cab-classification-pipeline.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFMA_IMAGE}|g" taxi-cab-classification-pipeline.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:\([a-zA-Z0-9_.-]\)\+|${KUBEFLOW_DNNTRAINER_IMAGE}|g" taxi-cab-classification-pipeline.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:\([a-zA-Z0-9_.-]\)\+|${KUBEFLOW_DEPLOYER_IMAGE}|g" taxi-cab-classification-pipeline.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:\([a-zA-Z0-9_.-]\)\+|${LOCAL_CONFUSIONMATRIX_IMAGE}|g" taxi-cab-classification-pipeline.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-roc:\([a-zA-Z0-9_.-]\)\+|${LOCAL_ROC_IMAGE}|g" taxi-cab-classification-pipeline.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFT_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_PREDICT_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFDV_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:\([a-zA-Z0-9_.-]\)\+|${DATAFLOW_TFMA_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:\([a-zA-Z0-9_.-]\)\+|${KUBEFLOW_DNNTRAINER_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:\([a-zA-Z0-9_.-]\)\+|${KUBEFLOW_DEPLOYER_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:\([a-zA-Z0-9_.-]\)\+|${LOCAL_CONFUSIONMATRIX_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-roc:\([a-zA-Z0-9_.-]\)\+|${LOCAL_ROC_IMAGE}|g" taxi-cab-classification-pipeline-gcp.py
fi

dsl-compile --py taxi-cab-classification-pipeline.py --output taxi-cab-classification-pipeline.tar.gz
dsl-compile --py taxi-cab-classification-pipeline-gcp.py --output taxi-cab-classification-pipeline-gcp.tar.gz
cd "${TEST_DIR}"
python3 run_tfx_test.py --input ${BASE_DIR}/samples/tfx/taxi-cab-classification-pipeline.tar.gz --result $SAMPLE_TFX_TEST_RESULT --output $SAMPLE_TFX_TEST_OUTPUT --namespace ${NAMESPACE}
python3 run_tfx_test.py --input ${BASE_DIR}/samples/tfx/taxi-cab-classification-pipeline-gcp.tar.gz --result $SAMPLE_TFX_TEST_RESULT --output $SAMPLE_TFX_TEST_OUTPUT --namespace ${NAMESPACE}
echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/"
gsutil cp ${SAMPLE_TFX_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_TFX_TEST_RESULT}
elif [ "$TEST_NAME" == "sequential" ]; then
SAMPLE_SEQUENTIAL_TEST_RESULT=junit_SampleSequentialOutput.xml
SAMPLE_SEQUENTIAL_TEST_OUTPUT=${RESULTS_GCS_DIR}

#TODO: Create testing for sample TFX taxi on-prem cluster.

# Compile samples
cd ${BASE_DIR}/samples/basic
dsl-compile --py sequential.py --output sequential.tar.gz
Expand Down

0 comments on commit 6b33f16

Please sign in to comment.