diff --git a/samples/xgboost-spark/xgboost-training-cm.py b/samples/xgboost-spark/xgboost-training-cm.py index d24e3d1330f..636a4c62531 100755 --- a/samples/xgboost-spark/xgboost-training-cm.py +++ b/samples/xgboost-spark/xgboost-training-cm.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2018 Google LLC +# Copyright 2019 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,136 +14,188 @@ # limitations under the License. -import kfp.dsl as dsl -import kfp.gcp as gcp +import kfp from kfp import components +from kfp import dsl +from kfp import gcp confusion_matrix_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/785d474699cffb7463986b9abc4b1fbe03796cb6/components/local/confusion_matrix/component.yaml') roc_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/785d474699cffb7463986b9abc4b1fbe03796cb6/components/local/roc/component.yaml') +# ! Please do not forget to enable the Dataproc API in your cluster https://console.developers.google.com/apis/api/dataproc.googleapis.com/overview + # ================================================================ # The following classes should be provided by components provider. -class CreateClusterOp(dsl.ContainerOp): - - def __init__(self, name, project, region, staging): - super(CreateClusterOp, self).__init__( - name=name, - image='gcr.io/ml-pipeline/ml-pipeline-dataproc-create-cluster:e20fad3e161e88226c83437271adb063221459b9', - arguments=[ - '--project', project, - '--region', region, - '--name', 'xgb-{{workflow.name}}', - '--staging', staging - ], - file_outputs={'output': '/output.txt'}) - - -class DeleteClusterOp(dsl.ContainerOp): - - def __init__(self, name, project, region): - super(DeleteClusterOp, self).__init__( - name=name, - image='gcr.io/ml-pipeline/ml-pipeline-dataproc-delete-cluster:e20fad3e161e88226c83437271adb063221459b9', - arguments=[ - '--project', project, - '--region', region, - '--name', 'xgb-{{workflow.name}}', - ], - is_exit_handler=True) - - -class AnalyzeOp(dsl.ContainerOp): - - def __init__(self, name, project, region, cluster_name, schema, train_data, output): - super(AnalyzeOp, self).__init__( - name=name, - image='gcr.io/ml-pipeline/ml-pipeline-dataproc-analyze:e20fad3e161e88226c83437271adb063221459b9', - arguments=[ - '--project', project, - '--region', region, - '--cluster', cluster_name, - '--schema', schema, - '--train', train_data, - '--output', output, - ], - file_outputs={'output': '/output.txt'}) - - -class TransformOp(dsl.ContainerOp): - - def __init__(self, name, project, region, cluster_name, train_data, eval_data, - target, analysis, output): - super(TransformOp, self).__init__( - name=name, - image='gcr.io/ml-pipeline/ml-pipeline-dataproc-transform:e20fad3e161e88226c83437271adb063221459b9', - arguments=[ - '--project', project, - '--region', region, - '--cluster', cluster_name, - '--train', train_data, - '--eval', eval_data, - '--analysis', analysis, - '--target', target, - '--output', output, - ], - file_outputs={'train': '/output_train.txt', 'eval': '/output_eval.txt'}) - - -class TrainerOp(dsl.ContainerOp): - - def __init__(self, name, project, region, cluster_name, train_data, eval_data, - target, analysis, workers, rounds, output, is_classification=True): +def dataproc_create_cluster_op( + project, + region, + staging, + cluster_name='xgb-{{workflow.name}}' +): + return dsl.ContainerOp( + name='Dataproc - Create cluster', + image='gcr.io/ml-pipeline/ml-pipeline-dataproc-create-cluster:e20fad3e161e88226c83437271adb063221459b9', + arguments=[ + '--project', project, + '--region', region, + '--name', cluster_name, + '--staging', staging, + ], + file_outputs={ + 'output': '/output.txt', + } + ) + + +def dataproc_delete_cluster_op( + project, + region, + cluster_name='xgb-{{workflow.name}}' +): + return dsl.ContainerOp( + name='Dataproc - Delete cluster', + image='gcr.io/ml-pipeline/ml-pipeline-dataproc-delete-cluster:e20fad3e161e88226c83437271adb063221459b9', + arguments=[ + '--project', project, + '--region', region, + '--name', cluster_name, + ], + is_exit_handler=True + ) + + +def dataproc_analyze_op( + project, + region, + cluster_name, + schema, + train_data, + output +): + return dsl.ContainerOp( + name='Dataproc - Analyze', + image='gcr.io/ml-pipeline/ml-pipeline-dataproc-analyze:e20fad3e161e88226c83437271adb063221459b9', + arguments=[ + '--project', project, + '--region', region, + '--cluster', cluster_name, + '--schema', schema, + '--train', train_data, + '--output', output, + ], + file_outputs={ + 'output': '/output.txt', + } + ) + + +def dataproc_transform_op( + project, + region, + cluster_name, + train_data, + eval_data, + target, + analysis, + output +): + return dsl.ContainerOp( + name='Dataproc - Transform', + image='gcr.io/ml-pipeline/ml-pipeline-dataproc-transform:e20fad3e161e88226c83437271adb063221459b9', + arguments=[ + '--project', project, + '--region', region, + '--cluster', cluster_name, + '--train', train_data, + '--eval', eval_data, + '--analysis', analysis, + '--target', target, + '--output', output, + ], + file_outputs={ + 'train': '/output_train.txt', + 'eval': '/output_eval.txt', + } + ) + + +def dataproc_train_op( + project, + region, + cluster_name, + train_data, + eval_data, + target, + analysis, + workers, + rounds, + output, + is_classification=True +): if is_classification: config='gs://ml-pipeline-playground/trainconfcla.json' else: config='gs://ml-pipeline-playground/trainconfreg.json' - super(TrainerOp, self).__init__( - name=name, - image='gcr.io/ml-pipeline/ml-pipeline-dataproc-train:e20fad3e161e88226c83437271adb063221459b9', - arguments=[ - '--project', project, - '--region', region, - '--cluster', cluster_name, - '--train', train_data, - '--eval', eval_data, - '--analysis', analysis, - '--target', target, - '--package', 'gs://ml-pipeline-playground/xgboost4j-example-0.8-SNAPSHOT-jar-with-dependencies.jar', - '--workers', workers, - '--rounds', rounds, - '--conf', config, - '--output', output, - ], - file_outputs={'output': '/output.txt'}) - - -class PredictOp(dsl.ContainerOp): - - def __init__(self, name, project, region, cluster_name, data, model, target, analysis, output): - super(PredictOp, self).__init__( - name=name, - image='gcr.io/ml-pipeline/ml-pipeline-dataproc-predict:e20fad3e161e88226c83437271adb063221459b9', - arguments=[ - '--project', project, - '--region', region, - '--cluster', cluster_name, - '--predict', data, - '--analysis', analysis, - '--target', target, - '--package', 'gs://ml-pipeline-playground/xgboost4j-example-0.8-SNAPSHOT-jar-with-dependencies.jar', - '--model', model, - '--output', output, - ], - file_outputs={'output': '/output.txt'}) + return dsl.ContainerOp( + name='Dataproc - Train XGBoost model', + image='gcr.io/ml-pipeline/ml-pipeline-dataproc-train:e20fad3e161e88226c83437271adb063221459b9', + arguments=[ + '--project', project, + '--region', region, + '--cluster', cluster_name, + '--train', train_data, + '--eval', eval_data, + '--analysis', analysis, + '--target', target, + '--package', 'gs://ml-pipeline-playground/xgboost4j-example-0.8-SNAPSHOT-jar-with-dependencies.jar', + '--workers', workers, + '--rounds', rounds, + '--conf', config, + '--output', output, + ], + file_outputs={ + 'output': '/output.txt', + } + ) + + +def dataproc_predict_op( + project, + region, + cluster_name, + data, + model, + target, + analysis, + output +): + return dsl.ContainerOp( + name='Dataproc - Predict with XGBoost model', + image='gcr.io/ml-pipeline/ml-pipeline-dataproc-predict:e20fad3e161e88226c83437271adb063221459b9', + arguments=[ + '--project', project, + '--region', region, + '--cluster', cluster_name, + '--predict', data, + '--analysis', analysis, + '--target', target, + '--package', 'gs://ml-pipeline-playground/xgboost4j-example-0.8-SNAPSHOT-jar-with-dependencies.jar', + '--model', model, + '--output', output, + ], + file_outputs={ + 'output': '/output.txt', + } + ) # ======================================================================= @dsl.pipeline( - name='XGBoost Trainer', - description='A trainer that does end-to-end distributed training for XGBoost models.' + name='XGBoost Trainer', + description='A trainer that does end-to-end distributed training for XGBoost models.' ) def xgb_train_pipeline( output, @@ -157,34 +209,75 @@ def xgb_train_pipeline( workers=2, true_label='ACTION', ): - delete_cluster_op = DeleteClusterOp('delete-cluster', project, region).apply(gcp.use_gcp_secret('user-gcp-sa')) - with dsl.ExitHandler(exit_op=delete_cluster_op): - create_cluster_op = CreateClusterOp('create-cluster', project, region, output).apply(gcp.use_gcp_secret('user-gcp-sa')) - - analyze_op = AnalyzeOp('analyze', project, region, create_cluster_op.output, schema, - train_data, '%s/{{workflow.name}}/analysis' % output).apply(gcp.use_gcp_secret('user-gcp-sa')) + output_template = str(output) + '/{{workflow.uid}}/{{pod.name}}/data' - transform_op = TransformOp('transform', project, region, create_cluster_op.output, - train_data, eval_data, target, analyze_op.output, - '%s/{{workflow.name}}/transform' % output).apply(gcp.use_gcp_secret('user-gcp-sa')) - - train_op = TrainerOp('train', project, region, create_cluster_op.output, transform_op.outputs['train'], - transform_op.outputs['eval'], target, analyze_op.output, workers, - rounds, '%s/{{workflow.name}}/model' % output).apply(gcp.use_gcp_secret('user-gcp-sa')) - - predict_op = PredictOp('predict', project, region, create_cluster_op.output, transform_op.outputs['eval'], - train_op.output, target, analyze_op.output, '%s/{{workflow.name}}/predict' % output).apply(gcp.use_gcp_secret('user-gcp-sa')) - - confusion_matrix_task = confusion_matrix_op(predict_op.output, - '%s/{{workflow.name}}/confusionmatrix' % output).apply(gcp.use_gcp_secret('user-gcp-sa')) - - roc_task = roc_op( - predictions_dir=predict_op.output, - true_class=true_label, - true_score_column=true_label, - output_dir='%s/{{workflow.name}}/roc' % output + delete_cluster_op = dataproc_delete_cluster_op( + project, + region ).apply(gcp.use_gcp_secret('user-gcp-sa')) + with dsl.ExitHandler(exit_op=delete_cluster_op): + create_cluster_op = dataproc_create_cluster_op( + project, + region, + output + ).apply(gcp.use_gcp_secret('user-gcp-sa')) + + analyze_op = dataproc_analyze_op( + project, + region, + create_cluster_op.output, + schema, + train_data, + output_template + ).apply(gcp.use_gcp_secret('user-gcp-sa')) + + transform_op = dataproc_transform_op( + project, + region, + create_cluster_op.output, + train_data, + eval_data, + target, + analyze_op.output, + output_template + ).apply(gcp.use_gcp_secret('user-gcp-sa')) + + train_op = dataproc_train_op( + project, + region, + create_cluster_op.output, + transform_op.outputs['train'], + transform_op.outputs['eval'], + target, + analyze_op.output, + workers, + rounds, + output_template + ).apply(gcp.use_gcp_secret('user-gcp-sa')) + + predict_op = dataproc_predict_op( + project, + region, + create_cluster_op.output, + transform_op.outputs['eval'], + train_op.output, + target, + analyze_op.output, + output_template + ).apply(gcp.use_gcp_secret('user-gcp-sa')) + + confusion_matrix_task = confusion_matrix_op( + predict_op.output, + output_template + ).apply(gcp.use_gcp_secret('user-gcp-sa')) + + roc_task = roc_op( + predictions_dir=predict_op.output, + true_class=true_label, + true_score_column=true_label, + output_dir=output_template + ).apply(gcp.use_gcp_secret('user-gcp-sa')) + if __name__ == '__main__': - import kfp.compiler as compiler - compiler.Compiler().compile(xgb_train_pipeline, __file__ + '.zip') + kfp.compiler.Compiler().compile(xgb_train_pipeline, __file__ + '.zip')