diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/__init__.py b/solutionbox/structured_data/datalab_solutions/structured_data/__init__.py index 12407cb30..76a12ce46 100644 --- a/solutionbox/structured_data/datalab_solutions/structured_data/__init__.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/__init__.py @@ -13,3 +13,6 @@ from ._package import local_preprocess, cloud_preprocess, local_train, cloud_train, local_predict, \ cloud_predict, local_batch_predict, cloud_batch_predict + +# Source of truth for the version of this package. +__version__ = '0.0.1' \ No newline at end of file diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/_package.py b/solutionbox/structured_data/datalab_solutions/structured_data/_package.py index fdf17d076..9a9e6fb4c 100644 --- a/solutionbox/structured_data/datalab_solutions/structured_data/_package.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/_package.py @@ -39,19 +39,20 @@ import json import glob import StringIO +import subprocess import pandas as pd import tensorflow as tf -import yaml + +from tensorflow.python.lib.io import file_io from . import preprocess from . import trainer from . import predict -_TF_GS_URL = 'gs://cloud-datalab/deploy/tf/tensorflow-0.12.0rc0-cp27-none-linux_x86_64.whl' - -# TODO(brandondutra): move this url someplace else. -_SD_GS_URL = 'gs://cloud-ml-dev_bdt/structured_data-0.1.tar.gz' +#_SETUP_PY = '/datalab/packages_setup/structured_data/setup.py' +#_TF_VERSION = 'tensorflow-0.12.0rc0-cp27-none-linux_x86_64.whl' +#_TF_WHL = '/datalab/packages_setup/structured_data' def _default_project(): @@ -80,24 +81,36 @@ def _assert_gcs_files(files): raise ValueError('File %s is not a gcs path' % f) -def _run_cmd(cmd): - output = subprocess.Popen(cmd, shell=True, stderr=subprocess.STDOUT, stdout=subprocess.PIPE) +def _package_to_staging(staging_package_url): + """Repackage this package from local installed location and copy it to GCS. + + Args: + staging_package_url: GCS path. + """ + import datalab.mlalpha as mlalpha - while True: - line = output.stdout.readline().rstrip() - print(line) - if line == '' and output.poll() != None: - break + # Find the package root. __file__ is under [package_root]/datalab_solutions/inception. + package_root = os.path.abspath( + os.path.join(os.path.dirname(__file__), '../../')) + setup_path = os.path.abspath( + os.path.join(os.path.dirname(__file__), 'setup.py')) + tar_gz_path = os.path.join(staging_package_url, 'staging', 'sd.tar.gz') + print('Building package in %s and uploading to %s' % + (package_root, tar_gz_path)) + mlalpha.package_and_copy(package_root, setup_path, tar_gz_path) -def local_preprocess(output_dir, input_feature_file, input_file_pattern, schema_file): + + return tar_gz_path + + +def local_preprocess(output_dir, input_file_pattern, schema_file): """Preprocess data locally with Pandas Produce analysis used by training. Args: output_dir: The output directory to use. - input_feature_file: Describes defaults and column types. input_file_pattern: String. File pattern what will expand into a list of csv files. schema_file: File path to the schema file. @@ -106,14 +119,13 @@ def local_preprocess(output_dir, input_feature_file, input_file_pattern, schema_ args = ['local_preprocess', '--input_file_pattern=%s' % input_file_pattern, '--output_dir=%s' % output_dir, - '--schema_file=%s' % schema_file, - '--input_feature_file=%s' % input_feature_file] + '--schema_file=%s' % schema_file] print('Starting local preprocessing.') preprocess.local_preprocess.main(args) print('Local preprocessing done.') -def cloud_preprocess(output_dir, input_feature_file, input_file_pattern=None, schema_file=None, bigquery_table=None, project_id=None): +def cloud_preprocess(output_dir, input_file_pattern=None, schema_file=None, bigquery_table=None, project_id=None): """Preprocess data in the cloud with BigQuery. Produce analysis used by training. This can take a while, even for small @@ -121,7 +133,6 @@ def cloud_preprocess(output_dir, input_feature_file, input_file_pattern=None, sc Args: output_dir: The output directory to use. - input_feature_file: Describes defaults and column types. input_file_path: String. File pattern what will expand into a list of csv files. schema_file: File path to the schema file. @@ -131,8 +142,7 @@ def cloud_preprocess(output_dir, input_feature_file, input_file_pattern=None, sc _assert_gcs_files([output_dir, input_file_pattern]) args = ['cloud_preprocess', - '--output_dir=%s' % output_dir, - '--input_feature_file=%s' % input_feature_file] + '--output_dir=%s' % output_dir] if input_file_pattern: args.append('--input_file_pattern=%s' % input_file_pattern) @@ -155,9 +165,10 @@ def local_train(train_file_pattern, eval_file_pattern, preprocess_output_dir, output_dir, - transforms_file, model_type, max_steps, + transforms_file=None, + key_column=None, top_n=None, layer_sizes=None): """Train model locally. @@ -166,9 +177,55 @@ def local_train(train_file_pattern, eval_file_pattern: eval csv file preprocess_output_dir: The output directory from preprocessing output_dir: Output directory of training. - transforms_file: File path to the transforms file. - model_type: model type - max_steps: Int. Number of training steps to perform. + model_type: One of linear_classification, linear_regression, + dnn_classification, dnn_regression. + max_steps: Int. Number of training steps to perform. + transforms_file: File path to the transforms file. Example + { + "col_A": {"transform": "scale", "default": 0.0}, + "col_B": {"transform": "scale","value": 4}, + # Note col_C is missing, so default transform used. + "col_D": {"transform": "hash_one_hot", "hash_bucket_size": 4}, + "col_target": {"transform": "target"}, + "col_key": {"transform": "key"} + } + The keys correspond to the columns in the input files as defined by the + schema file during preprocessing. Some notes + 1) The "key" transform is required, but the "target" transform is + optional, as the target column must be the first column in the input + data, and all other transfroms are optional. + 2) Default values are optional. These are used if the input data has + missing values during training and prediction. If not supplied for a + column, the default value for a numerical column is that column's + mean vlaue, and for a categorical column the empty string is used. + 3) For numerical colums, the following transforms are supported: + i) {"transform": "identity"}: does nothing to the number. (default) + ii) {"transform": "scale"}: scales the colum values to -1, 1. + iii) {"transform": "scale", "value": a}: scales the colum values + to -a, a. + + For categorical colums, the transform supported depends on if the + model is a linear or DNN model because tf.layers is uesed. + For a linear model, the transforms supported are: + i) {"transform": "sparse"}: Makes a sparse vector using the full + vocabulary associated with the column (default). + ii) {"transform": "hash_sparse", "hash_bucket_size": n}: First each + string is hashed to an integer in the range [0, n), and then a + sparse vector is used. + + For a DNN model, the categorical transforms that are supported are: + i) {"transform": "one_hot"}: A one-hot vector using the full + vocabulary is used. (default) + ii) {"transform": "embedding", "embedding_dim": d}: Each label is + embedded into an d-dimensional space. + iii) {"transform": "hash_one_hot", "hash_bucket_size": n}: The label + is first hashed into the range [0, n) and then a one-hot encoding + is made. + iv) {"transform": "hash_embedding", "hash_bucket_size": n, + "embedding_dim": d}: First each label is hashed to [0, n), and + then each integer is embedded into a d-dimensional space. + key_column: key column name. If None, this information is read from the + transforms_file. top_n: Int. For classification problems, the output graph will contain the labels and scores for the top n classes with a default of n=1. Use None for regression problems. @@ -179,7 +236,19 @@ def local_train(train_file_pattern, nodes. """ #TODO(brandondutra): allow other flags to be set like batch size/learner rate - #TODO(brandondutra): doc someplace that TF>=0.12 and cloudml >-1.7 are needed. + + if key_column and not transforms_file: + # Make a transforms file. + transforms_file = os.math.join(output_dir, 'transforms_file.json') + file_io.write_string_to_file( + transforms_file, + json.dumps({key_column: {"transform": "key"}}, indent=2)) + elif not key_column and transforms_file: + pass + else: + raise ValueError('Exactly one of key_column or transforms_file should be ' + 'not None') + args = ['local_train', '--train_data_paths=%s' % train_file_pattern, @@ -189,8 +258,8 @@ def local_train(train_file_pattern, '--transforms_file=%s' % transforms_file, '--model_type=%s' % model_type, '--max_steps=%s' % str(max_steps)] - if layer_sizes: - args.extend(['--layer_sizes'] + [str(x) for x in layer_sizes]) + for i in range(len(layer_sizes)): + args.append('--layer_size%s=%s' % (i+1, str(layer_sizes[i]))) if top_n: args.append('--top_n=%s' % str(top_n)) @@ -202,12 +271,12 @@ def cloud_train(train_file_pattern, eval_file_pattern, preprocess_output_dir, output_dir, - transforms_file, model_type, max_steps, + transforms_file=None, + key_column=None, top_n=None, layer_sizes=None, - staging_bucket=None, project_id=None, job_name=None, scale_tier='STANDARD_1', @@ -219,9 +288,13 @@ def cloud_train(train_file_pattern, eval_file_pattern: eval csv file preprocess_output_dir: The output directory from preprocessing output_dir: Output directory of training. - transforms_file: File path to the transforms file. - model_type: model type + model_type: One of linear_classification, linear_regression, + dnn_classification, dnn_regression. max_steps: Int. Number of training steps to perform. + transforms_file: File path to the transforms file. See local_train for + a long description of this file. Must include the key transform. + key_column: key column name. If None, this information is read from the + transforms_file. top_n: Int. For classification problems, the output graph will contain the labels and scores for the top n classes with a default of n=1. Use None for regression problems. @@ -230,8 +303,6 @@ def cloud_train(train_file_pattern, will create three DNN layers where the first layer will have 10 nodes, the middle layer will have 3 nodes, and the laster layer will have 2 nodes. - - staging_bucket: GCS bucket. project_id: String. The GCE project to use. Defaults to the notebook's default project id. job_name: String. Job name as listed on the Dataflow service. If None, a @@ -240,11 +311,22 @@ def cloud_train(train_file_pattern, in this package. See https://cloud.google.com/ml/reference/rest/v1beta1/projects.jobs#ScaleTier """ #TODO(brandondutra): allow other flags to be set like batch size, - # learner rate, custom scale tiers, etc - #TODO(brandondutra): doc someplace that TF>=0.12 and cloudml >-1.7 are needed. + # learner rate, etc + + if key_column and not transforms_file: + # Make a transforms file. + transforms_file = os.math.join(output_dir, 'transforms_file.json') + file_io.write_string_to_file( + transforms_file, + json.dumps({key_column: {"transform": "key"}}, indent=2)) + elif not key_column and transforms_file: + pass + else: + raise ValueError('Exactly one of key_column or transforms_file should be ' + 'not None') _assert_gcs_files([train_file_pattern, eval_file_pattern, - preprocess_output_dir, transforms_file]) + preprocess_output_dir, transforms_file, output_dir]) # TODO: Convert args to a dictionary so we can use datalab's cloudml trainer. args = ['--train_data_paths=%s' % train_file_pattern, @@ -254,23 +336,21 @@ def cloud_train(train_file_pattern, '--transforms_file=%s' % transforms_file, '--model_type=%s' % model_type, '--max_steps=%s' % str(max_steps)] - if layer_sizes: - args.extend(['--layer_sizes'] + [str(x) for x in layer_sizes]) + for i in range(len(layer_sizes)): + args.append('--layer_size%s=%s' % (i+1, str(layer_sizes[i]))) if top_n: args.append('--top_n=%s' % str(top_n)) - # TODO(brandondutra): move these package uris locally, ask for a staging - # and copy them there. This package should work without cloudml having to - # maintain gs files!!! job_request = { - 'package_uris': [_TF_GS_URL, _SD_GS_URL], + 'package_uris': [_package_to_staging(output_dir)], 'python_module': 'datalab_solutions.structured_data.trainer.task', 'scale_tier': scale_tier, 'region': region, 'args': args } # Local import because cloudml service does not have datalab - import datalab.mlaplha + import datalab + cloud_runner = datalab.mlalpha.CloudRunner(job_request) if not job_name: job_name = 'structured_data_train_' + datetime.datetime.now().strftime('%y%m%d_%H%M%S') job = datalab.mlalpha.Job.submit_training(job_request, job_name) @@ -331,7 +411,8 @@ def local_predict(model_dir, data): print('Local prediction done.') # Read the header file. - with open(os.path.join(tmp_dir, 'csv_header.txt'), 'r') as f: + header_file = os.path.join(tmp_dir, 'csv_header.txt') + with open(header_file, 'r') as f: header = f.readline() # Print any errors to the screen. @@ -467,7 +548,9 @@ def cloud_batch_predict(model_dir, prediction_input_file, output_dir, '--trained_model_dir=%s' % model_dir, '--output_dir=%s' % output_dir, '--output_format=%s' % output_format, - '--batch_size=%s' % str(batch_size)] + '--batch_size=%s' % str(batch_size), + '--extra_package=%s' % _package_to_staging(output_dir)] + print(cmd) if shard_files: cmd.append('--shard_files') diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/predict/predict.py b/solutionbox/structured_data/datalab_solutions/structured_data/predict/predict.py index 8f58d260e..8982c1e25 100644 --- a/solutionbox/structured_data/datalab_solutions/structured_data/predict/predict.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/predict/predict.py @@ -43,6 +43,11 @@ def parse_arguments(argv): default=('structured-data-batch-prediction-' + datetime.datetime.now().strftime('%Y%m%d%H%M%S')), help='Dataflow job name. Must be unique over all jobs.') + parser.add_argument('--extra_package', + default=[], + action='append', + help=('If using --cloud, also installs these packages on ' + 'each dataflow worker')) # I/O args parser.add_argument('--predict_data', @@ -121,7 +126,7 @@ def __init__(self, trained_model_dir): schema = json.loads(file_io.read_file_to_string(schema_path)) self._num_expected_columns = len(schema) - def process(self, context): + def process(self, element): """Fixes csv line if target is missing. The first column is assumed to be the target column, and the TF graph @@ -136,19 +141,16 @@ def process(self, context): The value of the missing target column comes from the default value given to tf.decode_csv in the graph. """ - import logging import apache_beam as beam - num_columns = len(context.element.split(',')) + num_columns = len(element.split(',')) if num_columns == self._num_expected_columns: - yield context.element + yield element elif num_columns + 1 == self._num_expected_columns: - yield ',' + context.element + yield ',' + element else: - logging.error('Got an unexpected number of columns from [%s].' % - context.element) yield beam.pvalue.SideOutputValue('errors', - ('bad columns', context.element)) + ('bad columns', element)) class EmitAsBatchDoFn(beam.DoFn): @@ -163,14 +165,14 @@ def __init__(self, batch_size): self._batch_size = batch_size self._cached = [] - def process(self, context): - self._cached.append(context.element) + def process(self, element): + self._cached.append(element) if len(self._cached) >= self._batch_size: emit = self._cached self._cached = [] yield emit - def finish_bundle(self, context): + def finish_bundle(self, element=None): if len(self._cached) > 0: # pylint: disable=g-explicit-length-test yield self._cached @@ -182,7 +184,7 @@ def __init__(self, trained_model_dir): self._trained_model_dir = trained_model_dir self._session = None - def start_bundle(self, context=None): + def start_bundle(self, element=None): from tensorflow.contrib.session_bundle import session_bundle import json @@ -199,13 +201,17 @@ def start_bundle(self, context=None): self._aliases, self._tensor_names = zip(*self._output_alias_map.items()) - def finish_bundle(self, context=None): + def finish_bundle(self, element=None): self._session.close() - def process(self, context): + def process(self, element): + """Run batch prediciton on a TF graph. + + Args: + element: list of strings, representing one batch input to the TF graph. + """ import collections - import logging import apache_beam as beam num_in_batch = 0 @@ -213,7 +219,7 @@ def process(self, context): assert self._session is not None feed_dict = collections.defaultdict(list) - for line in context.element: + for line in element: feed_dict[self._input_alias_map.values()[0]].append(line) num_in_batch += 1 @@ -248,10 +254,8 @@ def process(self, context): yield predictions except Exception as e: # pylint: disable=broad-except - logging.error('RunGraphDoFn: Bad input: %s, Error: %s', - str(context.element), str(e)) yield beam.pvalue.SideOutputValue('errors', - (str(e), context.element)) + (str(e), element)) class RawJsonCoder(beam.coders.Coder): @@ -413,12 +417,15 @@ def main(argv=None): 'temp_location': os.path.join(args.output_dir, 'tmp', 'staging'), 'job_name': args.job_name, 'project': args.project_id, + 'no_save_main_session': True, + 'extra_packages': args.extra_package, + 'teardown_policy': 'TEARDOWN_ALWAYS', } opts = beam.pipeline.PipelineOptions(flags=[], **options) # Or use BlockingDataflowPipelineRunner - p = beam.Pipeline('DataflowPipelineRunner', options=opts) + p = beam.Pipeline('DataflowRunner', options=opts) else: - p = beam.Pipeline('DirectPipelineRunner') + p = beam.Pipeline('DirectRunner') make_prediction_pipeline(p, args) @@ -428,7 +435,11 @@ def main(argv=None): (options['job_name'], args.project_id)) sys.stdout.flush() - p.run() + r = p.run() + try: + r.wait_until_finish() + except AttributeError: + pass if __name__ == '__main__': diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/preprocess/cloud_preprocess.py b/solutionbox/structured_data/datalab_solutions/structured_data/preprocess/cloud_preprocess.py index d02817174..2c86497b5 100644 --- a/solutionbox/structured_data/datalab_solutions/structured_data/preprocess/cloud_preprocess.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/preprocess/cloud_preprocess.py @@ -26,9 +26,7 @@ from tensorflow.python.lib.io import file_io -INPUT_FEATURES_FILE = 'input_features.json' SCHEMA_FILE = 'schema.json' - NUMERICAL_ANALYSIS_FILE = 'numerical_analysis.json' CATEGORICAL_ANALYSIS_FILE = 'vocab_%s.csv' @@ -51,10 +49,6 @@ def parse_arguments(argv): type=str, required=True, help='Google Cloud Storage which to place outputs.') - parser.add_argument('--input_feature_file', - type=str, - required=True, - help=('Json file containing feature types')) parser.add_argument('--schema_file', type=str, @@ -115,27 +109,32 @@ def parse_table_name(bigquery_table): return id_name[1] -def run_numerical_analysis(table, args, feature_types): +def run_numerical_analysis(table, schema_list, args): """Find min/max values for the numerical columns and writes a json file. Args: - table: Reference to FederatedTable if bigquery_table is false. + table: Reference to FederatedTable (if bigquery_table is false) or a + regular Table (otherwise) + schema_list: Bigquery schema json object args: the command line args - feature_types: python object of the feature types file. """ import datalab.bigquery as bq # Get list of numerical columns. numerical_columns = [] - for name, config in feature_types.iteritems(): - if config['type'] == 'numerical': - numerical_columns.append(name) + for col_schema in schema_list: + col_type = col_schema['type'].lower() + if col_type == 'integer' or col_type == 'float': + numerical_columns.append(col_schema['name']) + # Run the numerical analysis if numerical_columns: sys.stdout.write('Running numerical analysis...') max_min = [ - 'max({name}) as max_{name}, min({name}) as min_{name}'.format(name=name) + ('max({name}) as max_{name}, ' + 'min({name}) as min_{name}, ' + 'avg({name}) as avg_{name} ').format(name=name) for name in numerical_columns] if args.bigquery_table: sql = 'SELECT %s from %s' % (', '.join(max_min), @@ -150,7 +149,8 @@ def run_numerical_analysis(table, args, feature_types): results_dict = {} for name in numerical_columns: results_dict[name] = {'max': numerical_results.iloc[0]['max_%s' % name], - 'min': numerical_results.iloc[0]['min_%s' % name]} + 'min': numerical_results.iloc[0]['min_%s' % name], + 'mean':numerical_results.iloc[0]['avg_%s' % name]} file_io.write_string_to_file( os.path.join(args.output_dir, NUMERICAL_ANALYSIS_FILE), @@ -159,7 +159,7 @@ def run_numerical_analysis(table, args, feature_types): sys.stdout.write('done.\n') -def run_categorical_analysis(table, args, feature_types): +def run_categorical_analysis(table, schema_list, args): """Find vocab values for the categorical columns and writes a csv file. The vocab files are in the from @@ -169,16 +169,20 @@ def run_categorical_analysis(table, args, feature_types): ... Args: - table: Reference to FederatedTable if bigquery_table is false. + table: Reference to FederatedTable (if bigquery_table is false) or a + regular Table (otherwise) + schema_list: Bigquery schema json object args: the command line args - feature_types: python object of the feature types file. """ import datalab.bigquery as bq + + # Get list of categorical columns. categorical_columns = [] - for name, config in feature_types.iteritems(): - if config['type'] == 'categorical': - categorical_columns.append(name) + for col_schema in schema_list: + col_type = col_schema['type'].lower() + if col_type == 'string': + categorical_columns.append(col_schema['name']) if categorical_columns: sys.stdout.write('Running categorical analysis...') @@ -227,10 +231,14 @@ def run_analysis(args): Args: args: command line args + + Raises: + ValueError if schema contains unknown types. """ import datalab.bigquery as bq if args.bigquery_table: table = bq.Table(args.bigquery_table) + schema_list = table.schema._bq_schema else: schema_list = json.loads(file_io.read_file_to_string(args.schema_file)) table = bq.FederatedTable().from_storage( @@ -241,26 +249,19 @@ def run_analysis(args): compressed=False, schema=bq.Schema(schema_list)) - feature_types = json.loads( - file_io.read_file_to_string(args.input_feature_file)) - - run_numerical_analysis(table, args, feature_types) - run_categorical_analysis(table, args, feature_types) + # Check the schema is supported. + for col_schema in schema_list: + col_type = col_schema['type'].lower() + if col_type != 'string' and col_type != 'integer' and col_type != 'float': + raise ValueError('Unknown schema type %s' % col_type) - # Save a copy of the input types to the output location. - file_io.copy(args.input_feature_file, - os.path.join(args.output_dir, INPUT_FEATURES_FILE), - overwrite=True) + run_numerical_analysis(table, schema_list, args) + run_categorical_analysis(table, schema_list, args) # Save a copy of the schema to the output location. - if args.schema_file: - file_io.copy(args.schema_file, - os.path.join(args.output_dir, SCHEMA_FILE), - overwrite=True) - else: - file_io.write_string_to_file( - os.path.join(args.output_dir, SCHEMA_FILE), - json.dumps(table.schema._bq_schema, indent=2, separators=(',', ': '))) + file_io.write_string_to_file( + os.path.join(args.output_dir, SCHEMA_FILE), + json.dumps(schema_list, indent=2, separators=(',', ': '))) def main(argv=None): diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/preprocess/local_preprocess.py b/solutionbox/structured_data/datalab_solutions/structured_data/preprocess/local_preprocess.py index 01ac80f23..1fe5b857b 100644 --- a/solutionbox/structured_data/datalab_solutions/structured_data/preprocess/local_preprocess.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/preprocess/local_preprocess.py @@ -27,10 +27,7 @@ from tensorflow.python.lib.io import file_io - -INPUT_FEATURES_FILE = 'input_features.json' SCHEMA_FILE = 'schema.json' - NUMERICAL_ANALYSIS_FILE = 'numerical_analysis.json' CATEGORICAL_ANALYSIS_FILE = 'vocab_%s.csv' @@ -58,10 +55,6 @@ def parse_arguments(argv): type=str, required=True, help=('BigQuery json schema file')) - parser.add_argument('--input_feature_file', - type=str, - required=True, - help=('Json file containing feature types')) args = parser.parse_args(args=argv[1:]) @@ -71,52 +64,66 @@ def parse_arguments(argv): return args -def run_numerical_categorical_analysis(args, feature_types, schema_list): +def run_numerical_categorical_analysis(args, schema_list): """Makes the numerical and categorical analysis files. Args: args: the command line args - feature_types: python object of the feature types json file schema_list: python object of the schema json file. Raises: - ValueError: if feature_types contains unknown column types. + ValueError: if schema contains unknown column types. """ header = [column['name'] for column in schema_list] input_files = file_io.get_matching_files(args.input_file_pattern) - # initialize numerical_results - numerical_results = {} - for name, config in feature_types.iteritems(): - if config['type'] == 'numerical': - numerical_results[name] = {'min': float('inf'), 'max': float('-inf')} - - # initialize categorical_results + # Check the schema is valid + for col_schema in schema_list: + col_type = col_schema['type'].lower() + if col_type != 'string' and col_type != 'integer' and col_type != 'float': + raise ValueError('Schema contains an unsupported type %s.' % col_type) + + # initialize the results + def _init_numerical_results(): + return {'min': float('inf'), + 'max': float('-inf'), + 'count': 0, + 'sum': 0.0} + numerical_results = collections.defaultdict(_init_numerical_results) categorical_results = collections.defaultdict(set) - # for each file, update the min/max values from that file, and update the set + # for each file, update the numerical stats from that file, and update the set # of unique labels. for input_file in input_files: with file_io.FileIO(input_file, 'r') as f: for line in f: parsed_line = dict(zip(header, line.strip().split(','))) - for name, config in feature_types.iteritems(): - # Update numerical analsysis - if config['type'] == 'numerical': - numerical_results[name]['min'] = min(numerical_results[name]['min'], - float(parsed_line[name])) - numerical_results[name]['max'] = max(numerical_results[name]['max'], - float(parsed_line[name])) - elif config['type'] == 'categorical': - # Update categorical analsysis - - categorical_results[name].update([parsed_line[name]]) - elif config['type'] == 'key': - pass + for col_schema in schema_list: + col_name = col_schema['name'] + col_type = col_schema['type'] + if col_type.lower() == 'string': + categorical_results[col_name].update([parsed_line[col_name]]) else: - raise ValueError('Unknown type %s in input features' - % config['type']) + # numerical column. + numerical_results[col_name]['min'] = ( + min(numerical_results[col_name]['min'], + float(parsed_line[col_name]))) + numerical_results[col_name]['max'] = ( + max(numerical_results[col_name]['max'], + float(parsed_line[col_name]))) + numerical_results[col_name]['count'] += 1 + numerical_results[col_name]['sum'] += float(parsed_line[col_name]) + + # Update numerical_results to just have min/min/mean + for col_schema in schema_list: + if col_schema['type'].lower() != 'string': + col_name = col_schema['name'] + mean = numerical_results[col_name]['sum'] / numerical_results[col_name]['count'] + del numerical_results[col_name]['sum'] + del numerical_results[col_name]['count'] + numerical_results[col_name]['mean'] = mean + # Write the numerical_results to a json file. file_io.write_string_to_file( @@ -125,9 +132,10 @@ def run_numerical_categorical_analysis(args, feature_types, schema_list): # Write the vocab files. Each label is on its own line. for name, unique_labels in categorical_results.iteritems(): + labels = '\n'.join(list(unique_labels)) file_io.write_string_to_file( os.path.join(args.output_dir, CATEGORICAL_ANALYSIS_FILE % name), - '\n'.join(list(unique_labels))) + labels) def run_analysis(args): @@ -135,15 +143,10 @@ def run_analysis(args): # Read the schema and input feature types schema_list = json.loads(file_io.read_file_to_string(args.schema_file)) - feature_types = json.loads( - file_io.read_file_to_string(args.input_feature_file)) - run_numerical_categorical_analysis(args, feature_types, schema_list) + run_numerical_categorical_analysis(args, schema_list) - # Also save a copy of the schema/input types in the output folder. - file_io.copy(args.input_feature_file, - os.path.join(args.output_dir, INPUT_FEATURES_FILE), - overwrite=True) + # Also save a copy of the schema in the output folder. file_io.copy(args.schema_file, os.path.join(args.output_dir, SCHEMA_FILE), overwrite=True) diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/setup.py b/solutionbox/structured_data/datalab_solutions/structured_data/setup.py index 1fb714b8d..5409db881 100644 --- a/solutionbox/structured_data/datalab_solutions/structured_data/setup.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/setup.py @@ -1,29 +1,66 @@ -# Copyright 2017 Google Inc. All Rights Reserved. +# Copyright 2017 Google Inc. All rights reserved. # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. -import setuptools +# A copy of this file must be made in datalab_solutions/structured_data/setup.py +import datetime +from setuptools import setup -NAME = 'structured_data_problem' -VERSION = '0.0.1' -if __name__ == '__main__': - setuptools.setup( - name=NAME, - version=VERSION, - packages=['trainer', 'preprocess', 'predict'], - author='Google', - author_email='cloudml-feedback@google.com', - test_suite='nose.collector', - tests_require=['nose']) +# The version is saved in an __init__ file. +def get_version(): + VERSIONFILE = os.path.join('datalab_solutions/structured_data/', + '__init__.py') + initfile_lines = open(VERSIONFILE, 'rt').readlines() + VSRE = r"^__version__ = ['\"]([^'\"]*)['\"]" + for line in initfile_lines: + mo = re.search(VSRE, line, re.M) + if mo: + return mo.group(1) + raise RuntimeError('Unable to find version string in %s.' % (VERSIONFILE,)) + + +setup( + name='structured_data', + version=get_version(), + packages=[ + 'datalab_solutions', + 'datalab_solutions.structured_data', + 'datalab_solutions.structured_data.trainer', + 'datalab_solutions.structured_data.preprocess', + 'datalab_solutions.structured_data.predict', + ], + description='Google Cloud Datalab Structured Data Package', + author='Google', + author_email='google-cloud-datalab-feedback@googlegroups.com', + keywords=[ + ], + license="Apache Software License", + classifiers=[ + "Programming Language :: Python", + "Programming Language :: Python :: 2", + "Development Status :: 4 - Beta", + "Environment :: Other Environment", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", + "Topic :: Software Development :: Libraries :: Python Modules" + ], + long_description=""" + """, + install_requires=[ + "tensorflow==0.12.1" + ], + package_data={ + }, + data_files=[], +) diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/test/e2e_functions.py b/solutionbox/structured_data/datalab_solutions/structured_data/test/e2e_functions.py index bf7bc963e..41f6f1011 100644 --- a/solutionbox/structured_data/datalab_solutions/structured_data/test/e2e_functions.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/test/e2e_functions.py @@ -79,16 +79,20 @@ def make_csv_data(filename, num_rows, problem_type, keep_target=True): f1.write(csv_line) -def make_preprocess_schema(filename): +def make_preprocess_schema(filename, problem_type): """Makes a schema file compatable with the output of make_csv_data. Writes a json file. + + Args: + filename: output file path + problem_type: regression or classification """ schema = [ { "mode": "REQUIRED", "name": "target", - "type": "INTEGER" + "type": ("STRING" if problem_type == 'classification' else "FLOAT") }, { "mode": "NULLABLE", @@ -130,35 +134,7 @@ def make_preprocess_schema(filename): f.write(json.dumps(schema)) -def make_preprocess_input_features(filename, problem_type): - """Makes an input features file compatable with the output of make_csv_data. - - Args: - filename: filename: writes data to csv file. - problem_type: 'classification' or 'regression'. Changes the target value. - """ - - feature_types = { - "key": {"default": -1, "type": "key"}, - "target": {"default": "unknown", "type": "categorical"}, - "num1": {"default": 0.0, "type": "numerical"}, - "num2": {"default": 0, "type": "numerical"}, - "num3": {"default": 0.0, "type": "numerical"}, - "str1": {"default": "black", "type": "categorical"}, - "str2": {"default": "abc", "type": "categorical"}, - "str3": {"default": "car", "type": "categorical"} - } - - if problem_type == 'regression': - feature_types['target']['type'] = 'numerical' - feature_types['target']['default'] = 0 - - with open(filename, 'w') as f: - f.write(json.dumps(feature_types)) - - -def run_preprocess(output_dir, csv_filename, schema_filename, - input_features_filename): +def run_preprocess(output_dir, csv_filename, schema_filename): preprocess_script = os.path.abspath( os.path.join(os.path.dirname(__file__), '../preprocess/local_preprocess.py')) @@ -166,8 +142,7 @@ def run_preprocess(output_dir, csv_filename, schema_filename, cmd = ['python', preprocess_script, '--output_dir', output_dir, '--input_file_pattern', csv_filename, - '--schema_file', schema_filename, - '--input_feature_file', input_features_filename, + '--schema_file', schema_filename ] print('Going to run command: %s' % ' '.join(cmd)) subprocess.check_call(cmd) #, stderr=open(os.devnull, 'wb')) diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/test/test_preprocess.py b/solutionbox/structured_data/datalab_solutions/structured_data/test/test_preprocess.py index c7a6eb309..c1e0dfa80 100644 --- a/solutionbox/structured_data/datalab_solutions/structured_data/test/test_preprocess.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/test/test_preprocess.py @@ -34,8 +34,6 @@ def setUp(self): self._csv_filename = os.path.join(self._test_dir, 'raw_csv_data.csv') self._schema_filename = os.path.join(self._test_dir, 'schema.json') - self._input_features_filename = os.path.join(self._test_dir, - 'input_features_file.json') self._preprocess_output = os.path.join(self._test_dir, 'pout') @@ -51,9 +49,8 @@ def _make_test_data(self, problem_type): problem_type: 'regression' or 'classification' """ e2e_functions.make_csv_data(self._csv_filename, 100, problem_type, True) - e2e_functions.make_preprocess_schema(self._schema_filename) - e2e_functions.make_preprocess_input_features(self._input_features_filename, - problem_type) + e2e_functions.make_preprocess_schema(self._schema_filename, problem_type) + def _test_preprocess(self, problem_type): self._make_test_data(problem_type) @@ -61,17 +58,14 @@ def _test_preprocess(self, problem_type): e2e_functions.run_preprocess( output_dir=self._preprocess_output, csv_filename=self._csv_filename, - schema_filename=self._schema_filename, - input_features_filename=self._input_features_filename) + schema_filename=self._schema_filename) schema_file = os.path.join(self._preprocess_output, 'schema.json') - features_file = os.path.join(self._preprocess_output, 'input_features.json') numerical_analysis_file = os.path.join(self._preprocess_output, 'numerical_analysis.json') - # test schema and features were copied + # test schema file was copied self.assertTrue(filecmp.cmp(schema_file, self._schema_filename)) - self.assertTrue(filecmp.cmp(features_file, self._input_features_filename)) expected_numerical_keys = ['num1', 'num2', 'num3'] if problem_type == 'regression': @@ -83,7 +77,8 @@ def _test_preprocess(self, problem_type): self.assertEqual(sorted(expected_numerical_keys), sorted(analysis.keys())) # Check that the vocab files are made - expected_vocab_files = ['vocab_str1.csv', 'vocab_str2.csv', 'vocab_str3.csv'] + expected_vocab_files = ['vocab_str1.csv', 'vocab_str2.csv', + 'vocab_str3.csv', 'vocab_key.csv'] if problem_type == 'classification': expected_vocab_files.append('vocab_target.csv') @@ -92,8 +87,8 @@ def _test_preprocess(self, problem_type): self.assertTrue(os.path.exists(vocab_file)) self.assertGreater(os.path.getsize(vocab_file), 0) - all_expected_files = (expected_vocab_files + ['input_features.json', - 'numerical_analysis.json', 'schema.json']) + all_expected_files = (expected_vocab_files + ['numerical_analysis.json', + 'schema.json']) all_file_paths = glob.glob(os.path.join(self._preprocess_output, '*')) all_files = [os.path.basename(path) for path in all_file_paths] self.assertEqual(sorted(all_expected_files), sorted(all_files)) diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/test/test_trainer.py b/solutionbox/structured_data/datalab_solutions/structured_data/test/test_trainer.py index 24421547f..1c7f944f7 100644 --- a/solutionbox/structured_data/datalab_solutions/structured_data/test/test_trainer.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/test/test_trainer.py @@ -67,15 +67,12 @@ def _run_training(self, problem_type, model_type, transforms, extra_args=[]): # Run preprocessing. e2e_functions.make_csv_data(self._csv_train_filename, 100, problem_type, True) e2e_functions.make_csv_data(self._csv_eval_filename, 100, problem_type, True) - e2e_functions.make_preprocess_schema(self._schema_filename) - e2e_functions.make_preprocess_input_features(self._input_features_filename, - problem_type) + e2e_functions.make_preprocess_schema(self._schema_filename, problem_type) e2e_functions.run_preprocess( output_dir=self._preprocess_output, csv_filename=self._csv_train_filename, - schema_filename=self._schema_filename, - input_features_filename=self._input_features_filename) + schema_filename=self._schema_filename) # Write the transforms file. with open(self._transforms_filename, 'w') as f: @@ -151,7 +148,6 @@ def _check_train_files(self): self.assertTrue(os.path.isfile(os.path.join(model_folder, 'export.meta'))) self.assertTrue(os.path.isfile(os.path.join(model_folder, 'schema.json'))) self.assertTrue(os.path.isfile(os.path.join(model_folder, 'transforms.json'))) - self.assertTrue(os.path.isfile(os.path.join(model_folder, 'input_features.json'))) def testRegressionDnn(self): @@ -162,10 +158,11 @@ def testRegressionDnn(self): "num2": {"transform": "scale","value": 4}, "str1": {"transform": "hash_embedding", "embedding_dim": 2, "hash_bucket_size": 4}, "str2": {"transform": "embedding", "embedding_dim": 3}, - "target": {"transform": "target"} + "target": {"transform": "target"}, + "key": {"transform": "key"}, } - extra_args = ['--layer_sizes 10 10 5'] + extra_args = ['--layer_size1=10', '--layer_size2=10', '--layer_size3=5'] self._run_training(problem_type='regression', model_type='dnn', transforms=transforms, @@ -184,7 +181,8 @@ def testRegressionLinear(self): "str1": {"transform": "hash_sparse", "hash_bucket_size": 2}, "str2": {"transform": "hash_sparse", "hash_bucket_size": 2}, "str3": {"transform": "hash_sparse", "hash_bucket_size": 2}, - "target": {"transform": "target"} + "target": {"transform": "target"}, + "key": {"transform": "key"}, } self._run_training(problem_type='regression', @@ -204,10 +202,11 @@ def testClassificationDnn(self): "str1": {"transform": "hash_one_hot", "hash_bucket_size": 4}, "str2": {"transform": "one_hot"}, "str3": {"transform": "embedding", "embedding_dim": 3}, - "target": {"transform": "target"} + "target": {"transform": "target"}, + "key": {"transform": "key"} } - extra_args = ['--layer_sizes 10 10 5'] + extra_args = ['--layer_size1=10', '--layer_size2=10', '--layer_size3=5'] self._run_training(problem_type='classification', model_type='dnn', transforms=transforms, @@ -225,7 +224,8 @@ def testClassificationLinear(self): "num2": {"transform": "scale","value": 4}, "str1": {"transform": "hash_sparse", "hash_bucket_size": 4}, "str2": {"transform": "sparse"}, - "target": {"transform": "target"} + "target": {"transform": "target"}, + "key": {"transform": "key"}, } self._run_training(problem_type='classification', diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/trainer/task.py b/solutionbox/structured_data/datalab_solutions/structured_data/trainer/task.py index 5fa3e6a8d..84aa9d263 100755 --- a/solutionbox/structured_data/datalab_solutions/structured_data/trainer/task.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/trainer/task.py @@ -18,6 +18,7 @@ import argparse import json import os +import re import sys import math @@ -252,11 +253,9 @@ def get_experiment(output_dir): # Save a copy of the scehma and input to the model folder. schema_file = os.path.join(args.preprocess_output_dir, util.SCHEMA_FILE) - input_features_file = os.path.join(args.preprocess_output_dir, - util.INPUT_FEATURES_FILE) # Make list of files to save with the trained model. - additional_assets = [args.transforms_file, schema_file, input_features_file] + additional_assets = [args.transforms_file, schema_file] if util.is_classification_model(args.model_type): target_name = train_config['target_column'] vocab_file_path = os.path.join( @@ -307,7 +306,10 @@ def get_experiment(output_dir): def parse_arguments(argv): """Parse the command line arguments.""" - parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser( + description=('Train a regression or classification model. Note that if ' + 'using a DNN model, --layer_size1=NUM, --layer_size2=NUM, ' + 'should be used. ')) # I/O file parameters parser.add_argument('--train_data_paths', type=str, action='append', @@ -332,6 +334,7 @@ def parse_arguments(argv): # HP parameters parser.add_argument('--learning_rate', type=float, default=0.01) parser.add_argument('--epsilon', type=float, default=0.0005) + # --layer_size See below # Model problems parser.add_argument('--model_type', @@ -345,7 +348,6 @@ def parse_arguments(argv): 'will contain the labels and scores for the top ' 'n classes.')) # Training input parameters - parser.add_argument('--layer_sizes', type=int, nargs='*') parser.add_argument('--max_steps', type=int, default=5000, help='Maximum number of training steps to perform.') parser.add_argument('--batch_size', type=int, default=1000) @@ -361,7 +363,40 @@ def parse_arguments(argv): ' of training steps. Should be large enough so that' ' a new checkpoined model is saved before running ' 'again.')) - return parser.parse_args(args=argv[1:]) + args, remaining_args = parser.parse_known_args(args=argv[1:]) + + # All HP parambeters must be unique, so we need to support an unknown number + # of --layer_size1=10 --layer_size2=10 ... + # Look at remaining_args for layer_size\d+ to get the layer info. + + # Get number of layers + pattern = re.compile('layer_size(\d+)') + num_layers = 0 + for other_arg in remaining_args: + match = re.search(pattern, other_arg) + if match: + num_layers = max(num_layers, int(match.group(1))) + + # Build a new parser so we catch unknown args and missing layer_sizes. + parser = argparse.ArgumentParser() + for i in range(num_layers): + parser.add_argument('--layer_size%s' % str(i+1), type=int, required=True) + + layer_args = vars(parser.parse_args(args=remaining_args)) + layer_sizes = [] + for i in range(num_layers): + key = 'layer_size%s' % str(i+1) + layer_sizes.append(layer_args[key]) + + assert len(layer_sizes) == num_layers + args.layer_sizes = layer_sizes + + return args + + + + + def main(argv=None): diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/trainer/util.py b/solutionbox/structured_data/datalab_solutions/structured_data/trainer/util.py index a0ab5e610..cf000941d 100755 --- a/solutionbox/structured_data/datalab_solutions/structured_data/trainer/util.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/trainer/util.py @@ -18,11 +18,9 @@ import os from StringIO import StringIO -import pandas as pd import tensorflow as tf from tensorflow.python.lib.io import file_io -INPUT_FEATURES_FILE = 'input_features.json' SCHEMA_FILE = 'schema.json' NUMERICAL_ANALYSIS = 'numerical_analysis.json' CATEGORICAL_ANALYSIS = 'vocab_%s.csv' @@ -220,9 +218,9 @@ def get_estimator(output_dir, train_config, args): # Check layers used for dnn models. if is_dnn_model(args.model_type) and not args.layer_sizes: - raise ValueError('--layer_sizes must be used with DNN models') + raise ValueError('--layer_size* must be used with DNN models') if is_linear_model(args.model_type) and args.layer_sizes: - raise ValueError('--layer_sizes cannot be used with linear models') + raise ValueError('--layer_size* cannot be used with linear models') # Build tf.learn features feature_columns = _tflearn_features(train_config, args) @@ -292,6 +290,10 @@ def preprocess_input(features, target, train_config, preprocess_output_dir, key_name = train_config['key_column'] # Do the numerical transforms. + # Numerical transforms supported for regression/classification + # 1) num -> do nothing (identity, default) + # 2) num -> scale to -1, 1 (scale) + # 3) num -> scale to -a, a (scale with value parameter) with tf.name_scope('numerical_feature_preprocess') as scope: if train_config['numerical_columns']: numerical_analysis_file = os.path.join(preprocess_output_dir, @@ -508,20 +510,18 @@ def get_vocabulary(preprocess_output_dir, name): raise ValueError('File %s not found in %s' % (CATEGORICAL_ANALYSIS % name, preprocess_output_dir)) - df = pd.read_csv(StringIO(file_io.read_file_to_string(vocab_file)), - header=None, - names=['labels']) - label_values = df['labels'].values.tolist() + labels = file_io.read_file_to_string(vocab_file).split('\n') + label_values = [x for x in labels if x] # remove empty lines - return [str(value) for value in label_values] + return label_values def merge_metadata(preprocess_output_dir, transforms_file): - """Merge schema, input features, and transforms file into one python object. + """Merge schema, analysis, and transforms files into one python object. Args: preprocess_output_dir: the output folder of preprocessing. Should contain - the schema, input feature files, and the numerical and categorical + the schema, and the numerical and categorical analysis files. transforms_file: the training transforms file. @@ -544,89 +544,102 @@ def merge_metadata(preprocess_output_dir, transforms_file): Raises: ValueError: if one of the input metadata files is wrong. """ - + numerical_anlysis_file = os.path.join(preprocess_output_dir, + NUMERICAL_ANALYSIS) schema_file = os.path.join(preprocess_output_dir, SCHEMA_FILE) - input_features_file = os.path.join(preprocess_output_dir, INPUT_FEATURES_FILE) + numerical_anlysis = json.loads(file_io.read_file_to_string( + numerical_anlysis_file)) schema = json.loads(file_io.read_file_to_string(schema_file)) - input_features = json.loads(file_io.read_file_to_string(input_features_file)) transforms = json.loads(file_io.read_file_to_string(transforms_file)) result_dict = {} - result_dict['csv_header'] = [schema_dict['name'] for schema_dict in schema] - result_dict['csv_defaults'] = {} + result_dict['csv_header'] = [col_schema['name'] for col_schema in schema] result_dict['key_column'] = None result_dict['target_column'] = None result_dict['categorical_columns'] = [] result_dict['numerical_columns'] = [] result_dict['transforms'] = {} - result_dict['vocab_stats'] = {} + result_dict['csv_defaults'] = {} + result_dict['vocab_stats'] = {} - # get key column - for name, input_type in input_features.iteritems(): - if input_type['type'] == 'key': + # get key column. + for name, trans_config in transforms.iteritems(): + if trans_config.get('transform', None) == 'key': result_dict['key_column'] = name break if result_dict['key_column'] is None: - raise ValueError('Key column missing from input features file.') - - # get target column - for name, transform in transforms.iteritems(): - if transform.get('transform', None) == 'target': - result_dict['target_column'] = name + raise ValueError('Key transform missing form transfroms file.') + + # get target column. + result_dict['target_column'] = schema[0]['name'] + for name, trans_config in transforms.iteritems(): + if trans_config.get('transform', None) == 'target': + if name != result_dict['target_column']: + raise ValueError('Target from transform file does not correspond to ' + 'the first column of data') break - if result_dict['target_column'] is None: - raise ValueError('Target transform missing form transfroms file.') # Get the numerical/categorical columns. - for schema_dict in schema: - name = schema_dict['name'] - col_type = input_features.get(name, {}).get('type', None) - - if col_type is None: - raise ValueError('Missing type from %s in file %s' % ( - name, input_features_file)) - elif col_type == 'numerical': - result_dict['numerical_columns'].append(name) - elif col_type == 'categorical': - result_dict['categorical_columns'].append(name) - elif col_type == 'key': - pass - else: - raise ValueError('unknown type %s in input featrues file.' % col_type) - - # Get the defaults - for schema_dict in schema: - name = schema_dict['name'] - default = input_features.get(name, {}).get('default', None) - - if default is None: - raise ValueError('Missing default from %s in file %s' % ( - name, input_features_file)) - - # make all numerical types floats. This means when tf.decode_csv is called, - # float tensors and string tensors will be made. - if name in result_dict['categorical_columns']: - default = str(default) - elif name in result_dict['numerical_columns']: - default = float(default) + for col_schema in schema: + col_name = col_schema['name'] + col_type = col_schema['type'].lower() + if col_name == result_dict['key_column']: + continue + + if col_type == 'string': + result_dict['categorical_columns'].append(col_name) + elif col_type == 'integer' or col_type == 'float': + result_dict['numerical_columns'].append(col_name) else: - default = str(default) # key column + raise ValueError('Unsupported schema type %s' % col_type) - result_dict['csv_defaults'].update({name: default}) + # Get the transforms. + for name, trans_config in transforms.iteritems(): + if name != result_dict['target_column'] and name != result_dict['key_column']: + result_dict['transforms'][name] = trans_config - # Get the transforms - for name, transform in transforms.iteritems(): - if transform['transform'] != 'target': - result_dict['transforms'].update({name: transform}) - - # Load vocabs + # Get the vocab_stats for name in result_dict['categorical_columns']: - if name != result_dict['key_column']: - label_values = get_vocabulary(preprocess_output_dir, name) - n_classes = len(label_values) - result_dict['vocab_stats'][name] = {'n_classes': n_classes, - 'labels': label_values} + if name == result_dict['key_column']: + continue + + label_values = get_vocabulary(preprocess_output_dir, name) + if name != result_dict['target_column']: + label_values.append('') # append a 'missing' label. + n_classes = len(label_values) + result_dict['vocab_stats'][name] = {'n_classes': n_classes, + 'labels': label_values} + + # Get the csv_defaults + for col_schema in schema: + name = col_schema['name'] + col_type = col_schema['type'].lower() + default = transforms.get(name, {}).get('default', None) + if default is not None: + # convert int defaults to float + if name in result_dict['numerical_columns']: + default = float(default) + else: + # check default is in the vocab, otherwise use it as is. + default = str(default) + if default not in result_dict['vocab_stats'][name]['labels']: + raise ValueError('Default %s is not in the vocab for %s' % + (default, name)) + else: + # Default is not given, so pick one. + if name in result_dict['categorical_columns']: + default = '' + elif name in result_dict['numerical_columns']: + default = float(numerical_anlysis[name]['mean']) + elif name == result_dict['key_column']: + if col_type == 'string': + default = '' + else: + default = 0.0 + + + result_dict['csv_defaults'][name] = default validate_metadata(result_dict) return result_dict @@ -652,6 +665,7 @@ def validate_metadata(train_config): # categorical_columns or numerical_columns. sorted_columns = sorted(train_config['csv_header'] + [train_config['target_column']]) + sorted_columns2 = sorted(train_config['categorical_columns'] + train_config['numerical_columns'] + [train_config['key_column']] diff --git a/solutionbox/structured_data/setup.py b/solutionbox/structured_data/setup.py index eb4185f14..f1946744a 100644 --- a/solutionbox/structured_data/setup.py +++ b/solutionbox/structured_data/setup.py @@ -10,17 +10,31 @@ # or implied. See the License for the specific language governing permissions and limitations under # the License. -# To publish to PyPi use: python setup.py bdist_wheel upload -r pypi +# A copy of this file must be made in datalab_solutions/structured_data/setup.py import datetime +import os +import re from setuptools import setup -minor = datetime.datetime.now().strftime("%y%m%d%H%M") -version = '0.1' + + +# The version is saved in an __init__ file. +def get_version(): + VERSIONFILE = os.path.join('datalab_solutions/structured_data/', + '__init__.py') + initfile_lines = open(VERSIONFILE, 'rt').readlines() + VSRE = r"^__version__ = ['\"]([^'\"]*)['\"]" + for line in initfile_lines: + mo = re.search(VSRE, line, re.M) + if mo: + return mo.group(1) + raise RuntimeError('Unable to find version string in %s.' % (VERSIONFILE,)) + setup( name='structured_data', - version=version, + version=get_version(), packages=[ 'datalab_solutions', 'datalab_solutions.structured_data', @@ -47,7 +61,9 @@ long_description=""" """, install_requires=[ + "tensorflow==0.12.1" ], package_data={ - } + }, + data_files=[], )