Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK - Tests - Test creating component from the real AutoML pipeline #2314

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
name: Retail product stockout prediction pipeline
inputs:
- name: gcp_project_id
type: String
- name: gcp_region
type: String
- name: batch_predict_gcs_output_uri_prefix
type: String
- name: dataset_bq_input_uri
type: String
default: bq://product-stockout.product_stockout.stockout
optional: true
- name: dataset_display_name
type: String
default: stockout_data
optional: true
- name: target_column_name
type: String
default: Stockout
optional: true
- name: model_display_name
type: String
default: stockout_model
optional: true
- name: batch_predict_bq_input_uri
type: String
default: bq://product-stockout.product_stockout.batch_prediction_inputs
optional: true
- name: train_budget_milli_node_hours
type: Integer
default: '1000'
optional: true
outputs:
- name: model_path
type: String
implementation:
graph:
tasks:
Automl create dataset for tables:
componentRef:
url: https://raw.githubusercontent.com/kubeflow/pipelines/b3179d86b239a08bf4884b50dbf3a9151da96d66/components/gcp/automl/create_dataset_for_tables/component.yaml
arguments:
gcp_project_id:
graphInput: gcp_project_id
gcp_region:
graphInput: gcp_region
display_name:
graphInput: dataset_display_name
Automl import data from bigquery:
componentRef:
url: https://raw.githubusercontent.com/kubeflow/pipelines/b3179d86b239a08bf4884b50dbf3a9151da96d66/components/gcp/automl/import_data_from_bigquery/component.yaml
arguments:
dataset_path:
taskOutput:
outputName: dataset_path
taskId: Automl create dataset for tables
type: String
input_uri:
graphInput: dataset_bq_input_uri
Automl split dataset table column names:
componentRef:
url: https://raw.githubusercontent.com/kubeflow/pipelines/b3179d86b239a08bf4884b50dbf3a9151da96d66/components/gcp/automl/split_dataset_table_column_names/component.yaml
arguments:
dataset_path:
taskOutput:
outputName: dataset_path
taskId: Automl import data from bigquery
type: String
target_column_name:
graphInput: target_column_name
table_index: '0'
Automl create model for tables:
componentRef:
url: https://raw.githubusercontent.com/kubeflow/pipelines/b3179d86b239a08bf4884b50dbf3a9151da96d66/components/gcp/automl/create_model_for_tables/component.yaml
arguments:
gcp_project_id:
graphInput: gcp_project_id
gcp_region:
graphInput: gcp_region
display_name:
graphInput: model_display_name
dataset_id:
taskOutput:
outputName: dataset_path
taskId: Automl import data from bigquery
type: String
target_column_path:
taskOutput:
outputName: target_column_path
taskId: Automl split dataset table column names
type: String
input_feature_column_paths:
taskOutput:
outputName: feature_column_paths
taskId: Automl split dataset table column names
type: JsonArray
optimization_objective: MAXIMIZE_AU_PRC
train_budget_milli_node_hours:
graphInput: train_budget_milli_node_hours
Automl prediction service batch predict:
componentRef:
url: https://raw.githubusercontent.com/kubeflow/pipelines/b3179d86b239a08bf4884b50dbf3a9151da96d66/components/gcp/automl/prediction_service_batch_predict/component.yaml
arguments:
model_path:
taskOutput:
outputName: model_path
taskId: Automl create model for tables
type: String
gcs_output_uri_prefix:
graphInput: batch_predict_gcs_output_uri_prefix
bq_input_uri:
graphInput: batch_predict_bq_input_uri
outputValues:
model_path:
taskOutput:
outputName: model_path
taskId: Automl create model for tables
type: String
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from kfp.components import load_component_from_url

automl_create_dataset_for_tables_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/b3179d86b239a08bf4884b50dbf3a9151da96d66/components/gcp/automl/create_dataset_for_tables/component.yaml')
numerology marked this conversation as resolved.
Show resolved Hide resolved
automl_import_data_from_bigquery_source_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/b3179d86b239a08bf4884b50dbf3a9151da96d66/components/gcp/automl/import_data_from_bigquery/component.yaml')
automl_create_model_for_tables_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/b3179d86b239a08bf4884b50dbf3a9151da96d66/components/gcp/automl/create_model_for_tables/component.yaml')
automl_prediction_service_batch_predict_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/b3179d86b239a08bf4884b50dbf3a9151da96d66/components/gcp/automl/prediction_service_batch_predict/component.yaml')
automl_split_dataset_table_column_names_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/b3179d86b239a08bf4884b50dbf3a9151da96d66/components/gcp/automl/split_dataset_table_column_names/component.yaml')

from typing import NamedTuple

# flake8: noqa
def retail_product_stockout_prediction_pipeline(
gcp_project_id: str,
gcp_region: str,
batch_predict_gcs_output_uri_prefix: str,
dataset_bq_input_uri: str = 'bq://product-stockout.product_stockout.stockout',
dataset_display_name: str = 'stockout_data',
target_column_name: str = 'Stockout',
model_display_name: str = 'stockout_model',
batch_predict_bq_input_uri: str = 'bq://product-stockout.product_stockout.batch_prediction_inputs',
train_budget_milli_node_hours: 'Integer' = 1000,
) -> NamedTuple('Outputs', [('model_path', str)]):
# Create dataset
create_dataset_task = automl_create_dataset_for_tables_op(
gcp_project_id=gcp_project_id,
gcp_region=gcp_region,
display_name=dataset_display_name,
)

# Import data
import_data_task = automl_import_data_from_bigquery_source_op(
dataset_path=create_dataset_task.outputs['dataset_path'],
input_uri=dataset_bq_input_uri,
)

# Prepare column schemas
split_column_specs = automl_split_dataset_table_column_names_op(
dataset_path=import_data_task.outputs['dataset_path'],
table_index=0,
target_column_name=target_column_name,
)

# Train a model
create_model_task = automl_create_model_for_tables_op(
gcp_project_id=gcp_project_id,
gcp_region=gcp_region,
display_name=model_display_name,
#dataset_id=create_dataset_task.outputs['dataset_id'],
dataset_id=import_data_task.outputs['dataset_path'],
target_column_path=split_column_specs.outputs['target_column_path'],
#input_feature_column_paths=None, # All non-target columns will be used if None is passed
input_feature_column_paths=split_column_specs.outputs['feature_column_paths'],
optimization_objective='MAXIMIZE_AU_PRC',
train_budget_milli_node_hours=train_budget_milli_node_hours,
)#.after(import_data_task)

# Batch prediction
batch_predict_task = automl_prediction_service_batch_predict_op(
model_path=create_model_task.outputs['model_path'],
bq_input_uri=batch_predict_bq_input_uri,
gcs_output_uri_prefix=batch_predict_gcs_output_uri_prefix,
)

return [create_model_task.outputs['model_path']]
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ def pipeline1(pipeline_param_1: int):
self.assertListEqual([output.name for output in graph_component.outputs], ['Pipeline output 1', 'Pipeline output 2'])
self.assertEqual(len(graph_component.implementation.graph.tasks), 3)

def test_create_component_from_real_pipeline_retail_product_stockout_prediction(self):
from .test_data.retail_product_stockout_prediction_pipeline import retail_product_stockout_prediction_pipeline

graph_component = create_graph_component_spec_from_pipeline_func(retail_product_stockout_prediction_pipeline)

import yaml
expected_component_spec_path = str(Path(__file__).parent / 'test_data' / 'retail_product_stockout_prediction_pipeline.component.yaml')
with open(expected_component_spec_path) as f:
expected_dict = yaml.safe_load(f)

self.assertEqual(expected_dict, graph_component.to_dict())


if __name__ == '__main__':
unittest.main()