Skip to content

Commit

Permalink
Revert "refactor(components): De-hardcoded local output paths. (#580)"
Browse files Browse the repository at this point in the history
This reverts commit a77af2c.
  • Loading branch information
Bobgy authored Sep 10, 2020
1 parent f98252f commit 4a369e8
Show file tree
Hide file tree
Showing 15 changed files with 162 additions and 85 deletions.
33 changes: 33 additions & 0 deletions components/deprecated/dataflow/predict/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
name: Predict using TF on Dataflow
description: |
Runs TensorFlow prediction on Google Cloud Dataflow
Input and output data is in GCS
inputs:
- {name: Data file pattern, type: GCSPath, description: 'GCS or local path of test file patterns.'} # type: {GCSPath: {data_type: CSV}}
- {name: Schema, type: GCSPath, description: 'GCS json schema file path.'} # type: {GCSPath: {data_type: TFDV schema JSON}}
- {name: Target column, type: String, description: 'Name of the column for prediction target.'}
- {name: Model, type: GCSPath, description: 'GCS or local path of model trained with tft preprocessed data.'} # Models trained with estimator are exported to base/export/export/123456781 directory. # Our trainer export only one model. #TODO: Output single model from trainer # type: {GCSPath: {path_type: Directory, data_type: Exported TensorFlow models dir}}
- {name: Batch size, type: Integer, default: '32', description: 'Batch size used in prediction.'}
- {name: Run mode, type: String, default: local, description: 'Whether to run the job locally or in Cloud Dataflow. Valid values are "local" and "cloud".'}
- {name: GCP project, type: GCPProjectID, description: 'The GCP project to run the dataflow job.'}
- {name: Predictions dir, type: GCSPath, description: 'GCS or local directory.'} #Will contain prediction_results-* and schema.json files; TODO: Split outputs and replace dir with single file # type: {GCSPath: {path_type: Directory}}
outputs:
- {name: Predictions dir, type: GCSPath, description: 'GCS or local directory.'} #Will contain prediction_results-* and schema.json files; TODO: Split outputs and replace dir with single file # type: {GCSPath: {path_type: Directory}}
- {name: MLPipeline UI metadata, type: UI metadata}
implementation:
container:
image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:57d9f7f1cfd458e945d297957621716062d89a49
command: [python2, /ml/predict.py]
args: [
--data, {inputValue: Data file pattern},
--schema, {inputValue: Schema},
--target, {inputValue: Target column},
--model, {inputValue: Model},
--mode, {inputValue: Run mode},
--project, {inputValue: GCP project},
--batchsize, {inputValue: Batch size},
--output, {inputValue: Predictions dir},
]
fileOutputs:
Predictions dir: /output.txt
MLPipeline UI metadata: /mlpipeline-ui-metadata.json
34 changes: 34 additions & 0 deletions components/deprecated/dataflow/tfdv/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: TFX - Data Validation
description: |
Runs Tensorflow Data Validation. https://www.tensorflow.org/tfx/data_validation/get_started
Tensorflow Data Validation (TFDV) can analyze training and serving data to:
* compute descriptive statistics,
* infer a schema,
* detect data anomalies.
inputs:
- {name: Inference data, type: GCSPath, description: GCS path of the CSV file from which to infer the schema.} # type: {GCSPath: {data_type: CSV}}
- {name: Validation data, type: GCSPath, description: GCS path of the CSV file whose contents should be validated.} # type: {GCSPath: {data_type: CSV}}
- {name: Column names, type: GCSPath, description: GCS json file containing a list of column names.} # type: {GCSPath: {data_type: JSON}}
- {name: Key columns, type: String, description: Comma separated list of columns to treat as keys.}
- {name: GCP project, type: GCPProjectID, default: '', description: The GCP project to run the dataflow job.}
- {name: Run mode, type: String, default: local, description: Whether to run the job locally or in Cloud Dataflow. Valid values are "local" and "cloud". }
- {name: Validation output, type: GCSPath, description: GCS or local directory.} # type: {GCSPath: {path_type: Directory}}
outputs:
- {name: Schema, type: GCSPath, description: GCS path of the inferred schema JSON.} # type: {GCSPath: {data_type: TFDV schema JSON}}
- {name: Validation result, type: String, description: Indicates whether anomalies were detected or not.}
implementation:
container:
image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:57d9f7f1cfd458e945d297957621716062d89a49
command: [python2, /ml/validate.py]
args: [
--csv-data-for-inference, {inputValue: Inference data},
--csv-data-to-validate, {inputValue: Validation data},
--column-names, {inputValue: Column names},
--key-columns, {inputValue: Key columns},
--project, {inputValue: GCP project},
--mode, {inputValue: Run mode},
--output, {inputValue: Validation output},
]
fileOutputs:
Schema: /schema.txt
Validation result: /output_validation_result.txt
34 changes: 34 additions & 0 deletions components/deprecated/dataflow/tfma/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: TFX - Analyze model
description: |
Runs Tensorflow Model Analysis. https://www.tensorflow.org/tfx/model_analysis/get_started
TensorFlow Model Analysis allows you to perform model evaluations in the TFX pipeline, and view resultant metrics and plots in a Jupyter notebook. Specifically, it can provide:
* metrics computed on entire training and holdout dataset, as well as next-day evaluations
* tracking metrics over time
* model quality performance on different feature slices
inputs:
- {name: Model, type: GCSPath, description: GCS path to the model which will be evaluated.} # type: {GCSPath: {path_type: Directory, data_type: Exported TensorFlow models dir}}
- {name: Evaluation data, type: GCSPath, description: GCS path of eval files.} # type: {GCSPath: {data_type: CSV}}
- {name: Schema, type: GCSPath, description: GCS json schema file path.} # type: {GCSPath: {data_type: TFDV schema JSON}}
- {name: Run mode, type: String, default: local, description: whether to run the job locally or in Cloud Dataflow.}
- {name: GCP project, type: GCPProjectID, default: '', description: 'The GCP project to run the dataflow job, if running in the `cloud` mode.'}
- {name: Slice columns, type: String, description: Comma-separated list of columns on which to slice for analysis.}
- {name: Analysis results dir, type: GCSPath, description: GCS or local directory where the analysis results should be written.} # type: {GCSPath: {path_type: Directory}}
outputs:
- {name: Analysis results dir, type: GCSPath, description: GCS or local directory where the analysis results should were written.} # type: {GCSPath: {path_type: Directory}}
- {name: MLPipeline UI metadata, type: UI metadata}
implementation:
container:
image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:57d9f7f1cfd458e945d297957621716062d89a49
command: [python2, /ml/model_analysis.py]
args: [
--model, {inputValue: Model},
--eval, {inputValue: Evaluation data},
--schema, {inputValue: Schema},
--mode, {inputValue: Run mode},
--project, {inputValue: GCP project},
--slice-columns, {inputValue: Slice columns},
--output, {inputValue: Analysis results dir},
]
fileOutputs:
Analysis results dir: /output.txt
MLPipeline UI metadata: /mlpipeline-ui-metadata.json
27 changes: 27 additions & 0 deletions components/deprecated/dataflow/tft/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: Transform using TF on Dataflow
description: Runs TensorFlow Transform on Google Cloud Dataflow
inputs:
- {name: Training data file pattern, type: GCSPath, description: 'GCS path of train file patterns.'} #Also supports local CSV # type: {GCSPath: {data_type: CSV}}
- {name: Evaluation data file pattern, type: GCSPath, description: 'GCS path of eval file patterns.'} #Also supports local CSV # type: {GCSPath: {data_type: CSV}}
- {name: Schema, type: GCSPath, description: 'GCS json schema file path.'} # type: {GCSPath: {data_type: JSON}}
- {name: GCP project, type: GCPProjectID, description: 'The GCP project to run the dataflow job.'}
- {name: Run mode, type: String, default: local, description: 'Whether to run the job locally or in Cloud Dataflow. Valid values are "local" and "cloud".' }
- {name: Preprocessing module, type: GCSPath, default: '', description: 'GCS path to a python file defining "preprocess" and "get_feature_columns" functions.'} # type: {GCSPath: {data_type: Python}}
- {name: Transformed data dir, type: GCSPath, description: 'GCS or local directory'} #Also supports local paths # type: {GCSPath: {path_type: Directory}}
outputs:
- {name: Transformed data dir, type: GCSPath} # type: {GCSPath: {path_type: Directory}}
implementation:
container:
image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:57d9f7f1cfd458e945d297957621716062d89a49
command: [python2, /ml/transform.py]
args: [
--train, {inputValue: Training data file pattern},
--eval, {inputValue: Evaluation data file pattern},
--schema, {inputValue: Schema},
--project, {inputValue: GCP project},
--mode, {inputValue: Run mode},
--preprocessing-module, {inputValue: Preprocessing module},
--output, {inputValue: Transformed data dir},
]
fileOutputs:
Transformed data dir: /output.txt
9 changes: 2 additions & 7 deletions components/deprecated/dataproc/analyze/src/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import argparse
import os
from pathlib import Path

from common import _utils

Expand All @@ -38,10 +37,6 @@ def main(argv=None):
parser.add_argument('--output', type=str, help='GCS path to use for output.')
parser.add_argument('--train', type=str, help='GCS path of the training csv file.')
parser.add_argument('--schema', type=str, help='GCS path of the json schema file.')
parser.add_argument('--output-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing the output dir URI.')
args = parser.parse_args()

code_path = os.path.dirname(os.path.realpath(__file__))
Expand All @@ -55,8 +50,8 @@ def main(argv=None):
api, args.project, args.region, args.cluster, dest_files[0], spark_args)
print('Job request submitted. Waiting for completion...')
_utils.wait_for_job(api, args.project, args.region, job_id)
Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_dir_uri_output_path).write_text(args.output)
with open('/output.txt', 'w') as f:
f.write(args.output)

print('Job completed.')
finally:
Expand Down
2 changes: 1 addition & 1 deletion components/deprecated/dataproc/base/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ RUN easy_install pip

RUN pip install google-api-python-client==1.6.2

RUN pip install tensorflow==1.6.0 pathlib2
RUN pip install tensorflow==1.6.0

RUN wget -nv https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.zip && \
unzip -qq google-cloud-sdk.zip -d tools && \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import argparse
import os
from pathlib import Path

from common import _utils

Expand All @@ -33,10 +32,6 @@ def main(argv=None):
parser.add_argument('--region', type=str, help='Which zone for GCE VMs.')
parser.add_argument('--name', type=str, help='The name of the cluster to create.')
parser.add_argument('--staging', type=str, help='GCS path to use for staging.')
parser.add_argument('--output-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing the output dir URI.')
args = parser.parse_args()

code_path = os.path.dirname(os.path.realpath(__file__))
Expand All @@ -49,8 +44,8 @@ def main(argv=None):
create_response = _utils.create_cluster(api, args.project, args.region, args.name, dest_files[0])
print('Cluster creation request submitted. Waiting for completion...')
_utils.wait_for_operation(api, create_response['name'])
Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_dir_uri_output_path).write_text(args.output)
with open('/output.txt', 'w') as f:
f.write(args.name)
print('Cluster created.')
finally:
_utils.remove_resources_from_gcs(dest_files)
Expand Down
22 changes: 6 additions & 16 deletions components/deprecated/dataproc/predict/src/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import argparse
import json
import os
from pathlib import Path

from common import _utils
import logging
Expand All @@ -51,15 +50,6 @@ def main(argv=None):
parser.add_argument('--predict', type=str, help='GCS path of prediction libsvm file.')
parser.add_argument('--analysis', type=str, help='GCS path of the analysis input.')
parser.add_argument('--target', type=str, help='Target column name.')
parser.add_argument('--prediction-results-uri-pattern-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing prediction results URI pattern.')
parser.add_argument('--ui-metadata-output-path',
type=str,
default='/mlpipeline-ui-metadata.json',
help='Local output path for the file containing UI metadata JSON structure.')

args = parser.parse_args()

logging.getLogger().setLevel(logging.INFO)
Expand All @@ -71,9 +61,9 @@ def main(argv=None):
'ml.dmlc.xgboost4j.scala.example.spark.XGBoostPredictor', spark_args)
logging.info('Job request submitted. Waiting for completion...')
_utils.wait_for_job(api, args.project, args.region, job_id)
prediction_results_uri_pattern = os.path.join(args.output, 'part-*.csv')
Path(args.prediction_results_uri_pattern_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.prediction_results_uri_pattern_output_path).write_text(prediction_results_uri_pattern)
prediction_results = os.path.join(args.output, 'part-*.csv')
with open('/output.txt', 'w') as f:
f.write(prediction_results)

with file_io.FileIO(os.path.join(args.output, 'schema.json'), 'r') as f:
schema = json.load(f)
Expand All @@ -84,11 +74,11 @@ def main(argv=None):
'storage': 'gcs',
'format': 'csv',
'header': [x['name'] for x in schema],
'source': prediction_results_uri_pattern
'source': prediction_results
}]
}
Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.ui_metadata_output_path).write_text(json.dumps(metadata))
with open('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)
logging.info('Job completed.')


Expand Down
10 changes: 2 additions & 8 deletions components/deprecated/dataproc/train/src/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import argparse
import logging
from pathlib import Path

from common import _utils

Expand All @@ -52,11 +51,6 @@ def main(argv=None):
parser.add_argument('--eval', type=str, help='GCS path of the eval libsvm file pattern.')
parser.add_argument('--analysis', type=str, help='GCS path of the analysis input.')
parser.add_argument('--target', type=str, help='Target column name.')
parser.add_argument('--output-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing the output dir URI.')

args = parser.parse_args()

logging.getLogger().setLevel(logging.INFO)
Expand All @@ -69,8 +63,8 @@ def main(argv=None):
'ml.dmlc.xgboost4j.scala.example.spark.XGBoostTrainer', spark_args)
logging.info('Job request submitted. Waiting for completion...')
_utils.wait_for_job(api, args.project, args.region, job_id)
Path(args.output_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_dir_uri_output_path).write_text(args.output)
with open('/output.txt', 'w') as f:
f.write(args.output)

logging.info('Job completed.')

Expand Down
5 changes: 3 additions & 2 deletions components/kubeflow/dnntrainer/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ implementation:
--target, {inputValue: Target},
--preprocessing-module, {inputValue: Preprocessing module},
--job-dir, {inputValue: Training output dir},
--exported-model-dir-uri-output-path, {outputPath: Training output dir},
--ui-metadata-output-path, {outputPath: MLPipeline UI metadata},
]
fileOutputs:
Training output dir: /output.txt
MLPipeline UI metadata: /mlpipeline-ui-metadata.json
17 changes: 4 additions & 13 deletions components/kubeflow/dnntrainer/src/trainer/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import argparse
import json
import os
from pathlib import Path
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_model_analysis as tfma
Expand Down Expand Up @@ -81,14 +80,6 @@ def parse_arguments():
required=False,
help=('GCS path to a python file defining '
'"preprocess" and "get_feature_columns" functions.'))
parser.add_argument('--exported-model-dir-uri-output-path',
type=str,
default='/output.txt',
help='Local output path for the file containing exported model directory URI.')
parser.add_argument('--ui-metadata-output-path',
type=str,
default='/mlpipeline-ui-metadata.json',
help='Local output path for the file containing UI metadata JSON structure.')

args = parser.parse_args()
args.hidden_layer_size = [int(x.strip()) for x in args.hidden_layer_size.split(',')]
Expand Down Expand Up @@ -350,11 +341,11 @@ def main():
'source': args.job_dir,
}]
}
Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.ui_metadata_output_path).write_text(json.dumps(metadata))
with open('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)

Path(args.exported_model_dir_uri_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.exported_model_dir_uri_output_path).write_text(args.job_dir)
with open('/output.txt', 'w') as f:
f.write(args.job_dir)

if __name__ == '__main__':
main()
5 changes: 3 additions & 2 deletions components/local/confusion_matrix/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ implementation:
--predictions, {inputValue: Predictions},
--target_lambda, {inputValue: Target lambda},
--output, {inputValue: Output dir},
--ui-metadata-output-path, {outputPath: MLPipeline UI metadata},
--metrics-output-path, {outputPath: MLPipeline Metrics},
]
fileOutputs:
MLPipeline UI metadata: /mlpipeline-ui-metadata.json
MLPipeline Metrics: /mlpipeline-metrics.json
18 changes: 4 additions & 14 deletions components/local/confusion_matrix/src/confusion_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import os
import urlparse
import pandas as pd
from pathlib import Path
from sklearn.metrics import confusion_matrix, accuracy_score
from tensorflow.python.lib.io import file_io

Expand All @@ -40,15 +39,6 @@ def main(argv=None):
help='a lambda function as a string to compute target.' +
'For example, "lambda x: x[\'a\'] + x[\'b\']"' +
'If not set, the input must include a "target" column.')
parser.add_argument('--ui-metadata-output-path',
type=str,
default='/mlpipeline-ui-metadata.json',
help='Local output path for the file containing UI metadata JSON structure.')
parser.add_argument('--metrics-output-path',
type=str,
default='/mlpipeline-metrics.json',
help='Local output path for the file containing metrics JSON structure.')

args = parser.parse_args()

storage_service_scheme = urlparse.urlparse(args.output).scheme
Expand Down Expand Up @@ -95,8 +85,8 @@ def main(argv=None):
'labels': list(map(str, vocab)),
}]
}
Path(args.ui_metadata_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.ui_metadata_output_path).write_text(json.dumps(metadata))
with file_io.FileIO('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)

accuracy = accuracy_score(df['target'], df['predicted'])
metrics = {
Expand All @@ -106,8 +96,8 @@ def main(argv=None):
'format': "PERCENTAGE",
}]
}
Path(args.metrics_output_path).parent.mkdir(parents=True, exist_ok=True)
Path(args.metrics_output_path).write_text(json.dumps(metrics))
with file_io.FileIO('/mlpipeline-metrics.json', 'w') as f:
json.dump(metrics, f)

if __name__== "__main__":
main()
Loading

0 comments on commit 4a369e8

Please sign in to comment.