diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/_package.py b/solutionbox/structured_data/datalab_solutions/structured_data/_package.py index a1becb8b7..daa858932 100644 --- a/solutionbox/structured_data/datalab_solutions/structured_data/_package.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/_package.py @@ -80,10 +80,6 @@ def _is_in_IPython(): except ImportError: return False -def _check_transforms_config_file(transforms_config_file): - """Check that the transforms file has expected values.""" - pass - def _run_cmd(cmd): output = subprocess.Popen(cmd, shell=True, stderr=subprocess.STDOUT, stdout=subprocess.PIPE) @@ -94,7 +90,8 @@ def _run_cmd(cmd): if line == '' and output.poll() != None: break -def local_preprocess(input_file_path, output_dir, transforms_config_file, + +def local_preprocess(input_file_path, output_dir, schema_file, train_percent=None, eval_percent=None, test_percent=None): """Preprocess data locally with Beam. @@ -107,13 +104,11 @@ def local_preprocess(input_file_path, output_dir, transforms_config_file, files. Preprocessing will automatically slip the data into three sets for training, evaluation, and testing. Can be local or GCS path. output_dir: The output directory to use; can be local or GCS path. - transforms_config_file: File path to the config file. + schema_file: File path to the schema file. train_percent: Int in range [0, 100]. eval_percent: Int in range [0, 100]. - train_percent: Int in range [0, 100]. + test_percent: Int in range [0, 100]. """ - _check_transforms_config_file(transforms_config_file) - percent_flags = _percent_flags(train_percent, eval_percent, test_percent) this_folder = os.path.dirname(os.path.abspath(__file__)) @@ -121,15 +116,14 @@ def local_preprocess(input_file_path, output_dir, transforms_config_file, os.path.join(this_folder, 'preprocess/preprocess.py'), '--input_file_path=%s' % input_file_path, '--output_dir=%s' % output_dir, - '--transforms_config_file=%s' % transforms_config_file] + percent_flags + '--schema_file=%s' % schema_file] + percent_flags print('Local preprocess, running command: %s' % ' '.join(cmd)) _run_cmd(' '.join(cmd)) - print('Local preprocessing done.') -def cloud_preprocess(input_file_path, output_dir, transforms_config_file, +def cloud_preprocess(input_file_path, output_dir, schema_file, train_percent=None, eval_percent=None, test_percent=None, project_id=None, job_name=None): """Preprocess data in the cloud with Dataflow. @@ -142,8 +136,8 @@ def cloud_preprocess(input_file_path, output_dir, transforms_config_file, input_file_path: String. File pattern what will expand into a list of csv files. Preprocessing will automatically slip the data into three sets for training, evaluation, and testing. Can be local or GCS path. - output_dir: The output directory to use; can be local or GCS path. - transforms_config_file: File path to the config file. + output_dir: The output directory to use; should be GCS path. + schema_file: File path to the schema file. train_percent: Int in range [0, 100]. eval_percent: Int in range [0, 100]. train_percent: Int in range [0, 100]. @@ -152,8 +146,6 @@ def cloud_preprocess(input_file_path, output_dir, transforms_config_file, job_name: String. Job name as listed on the Dataflow service. If None, a default job name is selected. """ - _check_transforms_config_file(transforms_config_file) - percent_flags = _percent_flags(train_percent, eval_percent, test_percent) this_folder = os.path.dirname(os.path.abspath(__file__)) project_id = project_id or _default_project() @@ -167,11 +159,10 @@ def cloud_preprocess(input_file_path, output_dir, transforms_config_file, '--job_name=%s' % job_name, '--input_file_path=%s' % input_file_path, '--output_dir=%s' % output_dir, - '--transforms_config_file=%s' % transforms_config_file] + percent_flags + '--schema_file=%s' % schema_file] + percent_flags print('Cloud preprocess, running command: %s' % ' '.join(cmd)) _run_cmd(' '.join(cmd)) - print('Cloud preprocessing job submitted.') if _is_in_IPython(): @@ -184,27 +175,28 @@ def cloud_preprocess(input_file_path, output_dir, transforms_config_file, IPython.display.display_html(html, raw=True) - -def local_train(preprocessed_dir, transforms_config_file, output_dir, +def local_train(preprocessed_dir, transforms_file, output_dir, + model_type, layer_sizes=None, max_steps=None): """Train model locally. Args: preprocessed_dir: The output directory from preprocessing. Must contain files named features_train*.tfrecord.gz, features_eval*.tfrecord.gz, and metadata.json. Can be local or GCS path. - transforms_config_file: File path to the config file. + transforms_file: File describing transforms to perform on the data. output_dir: Output directory of training. + model_type: String. 'linear_classification', 'linear_regression', + 'dnn_classification', or 'dnn_regression. layer_sizes: String. Represents the layers in the connected DNN. If the model type is DNN, this must be set. Example "10 3 2", this will create three DNN layers where the first layer will have 10 nodes, the middle layer will have 3 nodes, and the laster layer will have 2 nodes. max_steps: Int. Number of training steps to perform. """ - _check_transforms_config_file(transforms_config_file) - #TODO(brandondutra): allow other flags to be set like batch size/learner rate #TODO(brandondutra): doc someplace that TF>=0.12 and cloudml >-1.7 are needed. + schema_file = os.path.join(preprocessed_dir, 'schema.json') train_filename = os.path.join(preprocessed_dir, 'features_train*') eval_filename = os.path.join(preprocessed_dir, 'features_eval*') metadata_filename = os.path.join(preprocessed_dir, 'metadata.json') @@ -220,7 +212,9 @@ def local_train(preprocessed_dir, transforms_config_file, output_dir, '--eval_data_paths=%s' % eval_filename, '--metadata_path=%s' % metadata_filename, '--output_path=%s' % output_dir, - '--transforms_config_file=%s' % transforms_config_file, + '--schema_file=%s' % schema_file, + '--transforms_file=%s' % transforms_file, + '--model_type=%s' % model_type, '--max_steps=%s' % str(max_steps)] if layer_sizes: cmd += ['--layer_sizes %s' % layer_sizes] @@ -230,17 +224,18 @@ def local_train(preprocessed_dir, transforms_config_file, output_dir, print('Local training done.') -def cloud_train(preprocessed_dir, transforms_config_file, output_dir, - staging_bucket, - layer_sizes=None, max_steps=None, project_id=None, - job_name=None, scale_tier='BASIC'): +def cloud_train(preprocessed_dir, transforms_file, output_dir, model_type, + staging_bucket, layer_sizes=None, max_steps=None, + project_id=None, job_name=None, scale_tier='BASIC'): """Train model using CloudML. Args: preprocessed_dir: The output directory from preprocessing. Must contain files named features_train*.tfrecord.gz, features_eval*.tfrecord.gz, and metadata.json. - transforms_config_file: File path to the config file. + transforms_file: File path to the transforms file. output_dir: Output directory of training. + model_type: String. 'linear_classification', 'linear_regression', + 'dnn_classification', or 'dnn_regression. staging_bucket: GCS bucket. layer_sizes: String. Represents the layers in the connected DNN. If the model type is DNN, this must be set. Example "10 3 2", this will @@ -254,16 +249,14 @@ def cloud_train(preprocessed_dir, transforms_config_file, output_dir, scale_tier: The CloudML scale tier. CUSTOM tiers are currently not supported in this package. See https://cloud.google.com/ml/reference/rest/v1beta1/projects.jobs#ScaleTier """ - _check_transforms_config_file(transforms_config_file) - #TODO(brandondutra): allow other flags to be set like batch size, # learner rate, custom scale tiers, etc #TODO(brandondutra): doc someplace that TF>=0.12 and cloudml >-1.7 are needed. if (not preprocessed_dir.startswith('gs://') - or not transforms_config_file.startswith('gs://') + or not transforms_file.startswith('gs://') or not output_dir.startswith('gs://')): - print('ERROR: preprocessed_dir, transforms_config_file, and output_dir ' + print('ERROR: preprocessed_dir, transforms_file, and output_dir, ' 'must all be in GCS.') return @@ -278,12 +271,13 @@ def cloud_train(preprocessed_dir, transforms_config_file, output_dir, subprocess.check_call(['gsutil', 'cp', _TF_GS_URL, temp_dir]) tf_local_package = os.path.join(temp_dir, os.path.basename(_TF_GS_URL)) - # Buld the training config file. + # Bulid the training config file. training_config_file_path = tempfile.mkstemp(dir=temp_dir)[1] training_config = {'trainingInput': {'scaleTier': scale_tier}} with open(training_config_file_path, 'w') as f: f.write(yaml.dump(training_config, default_flow_style=False)) + schema_file = os.path.join(preprocessed_dir, 'schema.json') train_filename = os.path.join(preprocessed_dir, 'features_train*') eval_filename = os.path.join(preprocessed_dir, 'features_eval*') metadata_filename = os.path.join(preprocessed_dir, 'metadata.json') @@ -306,19 +300,19 @@ def cloud_train(preprocessed_dir, transforms_config_file, output_dir, '--eval_data_paths=%s' % eval_filename, '--metadata_path=%s' % metadata_filename, '--output_path=%s' % output_dir, - '--transforms_config_file=%s' % transforms_config_file, + '--transforms_file=%s' % transforms_file, + '--schema_file=%s' % schema_file, + '--model_type=%s' % model_type, '--max_steps=%s' % str(max_steps)] if layer_sizes: cmd += ['--layer_sizes %s' % layer_sizes] print('CloudML training, running command: %s' % ' '.join(cmd)) _run_cmd(' '.join(cmd)) - print('CloudML training job submitted.') if _is_in_IPython(): import IPython - dataflow_url = ('https://console.developers.google.com/ml/jobs?project=%s' % project_id) html = ('

Click here to track ' @@ -328,23 +322,144 @@ def cloud_train(preprocessed_dir, transforms_config_file, output_dir, # Delete the temp files made shutil.rmtree(temp_dir) +def local_predict(model_dir, prediction_input_file): + """Runs local prediction. + + Runs local prediction in memory and prints the results to the screen. For + running prediction on a large dataset or saving the results, run + local_batch_prediction or batch_prediction. + + Args: + model_dir: Path to folder that contains the model. This is usully OUT/model + where OUT is the value of output_dir when local_training was ran. + prediction_input_file: csv file that has the same schem as the input + files used during local_preprocess, except that the target column is + removed. + """ + #TODO(brandondutra): remove this hack once cloudml 1.8 is released. + # Check that the model folder has a metadata.yaml file. If not, copy it. + if not os.path.isfile(os.path.join(model_dir, 'metadata.yaml')): + shutil.copy2(os.path.join(model_dir, 'metadata.json'), + os.path.join(model_dir, 'metadata.yaml')) + + cmd = ['gcloud beta ml local predict', + '--model-dir=%s' % model_dir, + '--text-instances=%s' % prediction_input_file] + print('Local prediction, running command: %s' % ' '.join(cmd)) + _run_cmd(' '.join(cmd)) + print('Local prediction done.') -def local_predict(): - """Not Implemented Yet.""" - print('local_predict') +def cloud_predict(model_name, prediction_input_file, version_name=None): + """Use Online prediction. -def cloud_predict(): - """Not Implemented Yet.""" - print('cloud_predict') + Runs online prediction in the cloud and prints the results to the screen. For + running prediction on a large dataset or saving the results, run + local_batch_prediction or batch_prediction. + Args: + model_dir: Path to folder that contains the model. This is usully OUT/model + where OUT is the value of output_dir when local_training was ran. + prediction_input_file: csv file that has the same schem as the input + files used during local_preprocess, except that the target column is + removed. + vsersion_name: Optional version of the model to use. If None, the default + version is used. + + Before using this, the model must be created. This can be done by running + two gcloud commands: + 1) gcloud beta ml models create NAME + 2) gcloud beta ml models versions create VERSION --model NAME \ + --origin gs://BUCKET/training_output_dir/model + or one datalab magic: + 1) %mlalpha deploy --name=NAME.VERSION \ + --path=gs://BUCKET/training_output_dir/model \ + --project=PROJECT + Note that the model must be on GCS. + """ + cmd = ['gcloud beta ml predict', + '--model=%s' % model_name, + '--text-instances=%s' % prediction_input_file] + if version_name: + cmd += ['--version=%s' % version_name] -def local_batch_predict(): - """Not Implemented Yet.""" - print('local_batch_predict') + print('CloudML online prediction, running command: %s' % ' '.join(cmd)) + _run_cmd(' '.join(cmd)) + print('CloudML online prediction done.') -def cloud_batch_predict(): - """Not Implemented Yet.""" - print('cloud_batch_predict') +def local_batch_predict(model_dir, prediction_input_file, output_dir): + """Local batch prediction. + Args: + model_dir: local path to trained model. + prediction_input_file: File path to input files. May contain a file pattern. + Only csv files are supported, and the scema must match what was used + in preprocessing except that the target column is removed. + output_dir: folder to save results to. + """ + #TODO(brandondutra): remove this hack once cloudml 1.8 is released. + # Check that the model folder has a metadata.yaml file. If not, copy it. + if not os.path.isfile(os.path.join(model_dir, 'metadata.yaml')): + shutil.copy2(os.path.join(model_dir, 'metadata.json'), + os.path.join(model_dir, 'metadata.yaml')) + + cmd = ['python -m google.cloud.ml.dataflow.batch_prediction_main', + '--input_file_format=text', + '--input_file_patterns=%s' % prediction_input_file, + '--output_location=%s' % output_dir, + '--model_dir=%s' % model_dir] + + print('Local batch prediction, running command: %s' % ' '.join(cmd)) + _run_cmd(' '.join(cmd)) + print('Local batch prediction done.') + + +def cloud_batch_predict(model_name, prediction_input_file, output_dir, region, + job_name=None, version_name=None): + """Cloud batch prediction. + + Args: + model_name: name of the model. The model must already exist. + prediction_input_file: File path to input files. May contain a file pattern. + Only csv files are supported, and the scema must match what was used + in preprocessing except that the target column is removed. Files must + be on GCS + output_dir: GCS folder to safe results to. + region: GCP compute region to run the batch job. Try using your default + region first, as this cloud batch prediction is not avaliable in all + regions. + job_name: job name used for the cloud job. + version_name: model version to use. If node, the default version of the + model is used. + """ + job_name = job_name or ('structured_data_batch_predict_' + + datetime.datetime.now().strftime('%Y%m%d%H%M%S')) + + if (not prediction_input_file.startswith('gs://') or + not output_dir.startswith('gs://')): + print('ERROR: prediction_input_file and output_dir must point to a ' + 'location on GCS.') + return + + cmd = ['gcloud beta ml jobs submit prediction %s' % job_name, + '--model=%s' % model_name, + '--region=%s' % region, + '--data-format=TEXT', + '--input-paths=%s' % prediction_input_file, + '--output-path=%s' % output_dir] + if version_name: + cmd += ['--version=%s' % version_name] + + print('CloudML batch prediction, running command: %s' % ' '.join(cmd)) + _run_cmd(' '.join(cmd)) + print('CloudML batch prediction job submitted.') + + if _is_in_IPython(): + import IPython + + dataflow_url = ('https://console.developers.google.com/ml/jobs?project=%s' + % _default_project()) + html = ('

Click here to track ' + 'the prediction job %s.


' % (dataflow_url, job_name)) + IPython.display.display_html(html, raw=True) diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/preprocess/preprocess.py b/solutionbox/structured_data/datalab_solutions/structured_data/preprocess/preprocess.py index 0bb5a9108..54da81d43 100644 --- a/solutionbox/structured_data/datalab_solutions/structured_data/preprocess/preprocess.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/preprocess/preprocess.py @@ -62,15 +62,15 @@ def parse_arguments(argv): type=int, help='Percent of input data for test dataset.') parser.add_argument('--output_dir', - type=str, + type=str, required=True, help=('Google Cloud Storage or Local directory in which ' 'to place outputs.')) - parser.add_argument('--transforms_config_file', - type=str, + parser.add_argument('--schema_file', + type=str, required=True, - help=('File describing the schema and transforms of ' - 'each column in the csv data files.')) + help=('File describing the schema of each column in the ' + 'csv data files.')) parser.add_argument('--job_name', type=str, help=('If using --cloud, the job name as listed in' @@ -93,17 +93,47 @@ def parse_arguments(argv): # args.job_name will not be used unless --cloud is used. if not args.job_name: - args.job_name = ('structured-data-' + + args.job_name = ('structured-data-' + datetime.datetime.now().strftime('%Y%m%d%H%M%S')) return args +def load_and_check_config(schema_file_path): + """Checks the sschema file is well formatted.""" + + try: + json_str = ml.util._file.load_file(schema_file_path) + config = json.loads(json_str) + except: + print('ERROR reading schema file.') + sys.exit(1) + + model_columns = (config.get('numerical_columns', []) + + config.get('categorical_columns', [])) + if config['target_column'] not in model_columns: + print('ERROR: target not listed as a numerical or categorial column.') + sys.exit(1) + + if set(config['column_names']) != set(model_columns + [config['key_column']]): + print('ERROR: column_names do not match what was listed other fields') + sys.exit(1) + + if set(config['numerical_columns']) & set(config['categorical_columns']): + print('ERROR: numerical_columns and categorical_columns must be disjoint.') + sys.exit(1) + + if config['key_column'] in model_columns: + print('ERROR: kye_column should not be listed in numerical_columns or categorical_columns') + sys.exit(1) + + return config + + def preprocessing_features(args): # Read the config file. - json_str = ml.util._file.load_file(args.transforms_config_file) - config = json.loads(json_str) + config = load_and_check_config(args.schema_file) column_names = config['column_names'] @@ -114,48 +144,38 @@ def preprocessing_features(args): # Extract target feature target_name = config['target_column'] - if config['problem_type'] == 'regression': + key_name = config['key_column'] + if target_name in config.get('numerical_columns', []): feature_set[target_name] = features.target(target_name).continuous() else: feature_set[target_name] = features.target(target_name).discrete() # Extract numeric features - if 'numerical' in config: - for name, transform_config in config['numerical'].iteritems(): - transform = transform_config['transform'] - default = transform_config.get('default', None) - if transform == 'scale': - feature_set[name] = features.numeric(name, default=default).scale() - elif transform == 'max_abs_scale': - feature_set[name] = features.numeric(name, default=default).max_abs_scale(transform_config['value']) - elif transform == 'identity': - feature_set[name] = features.numeric(name, default=default).identity() - else: - print('Error: unkown numerical transform name %s in %s' % (transform, str(transform_config))) - sys.exit(1) + for name in config.get('numerical_columns', []): + if name == target_name or name == key_name: + continue + # apply identity to all numerical features. + default = config.get('defaults', {}).get(name, None) + feature_set[name] = features.numeric(name, default=default).identity() # Extract categorical features - if 'categorical' in config: - for name, transform_config in config['categorical'].iteritems(): - transform = transform_config['transform'] - default = transform_config.get('default', None) - frequency_threshold = transform_config.get('frequency_threshold', 5) - if transform == 'one_hot' or transform == 'embedding': - feature_set[name] = features.categorical( - name, - default=default, - frequency_threshold=frequency_threshold) - else: - print('Error: unkown categorical transform name %s in %s' % (transform, str(transform_config))) - sys.exit(1) + for name in config.get('categorical_columns', []): + if name == target_name or name == key_name: + continue + # apply sparse transform to all categorical features. + default = config.get('defaults', {}).get(name, None) + feature_set[name] = features.categorical( + name, + default=default, + frequency_threshold=1).sparse(use_counts=True) return feature_set, column_names - def preprocess(pipeline, feature_set, column_names, input_file_path, - train_percent, eval_percent, test_percent, output_dir): + schema_file, train_percent, eval_percent, test_percent, + output_dir): """Builds the preprocessing Dataflow pipeline. The input files are split into a training, eval and test sets, and the SDK @@ -206,11 +226,14 @@ def _partition_fn(row_unused, num_partitions_unused): # pylint: disable=unused- >> io.SaveFeatures( os.path.join(output_dir, 'features_test'))) # pylint: enable=expression-not-assigned + # Put a copy of the schema file in the output folder. Datalab will look for + # it there. + ml.util._file.copy_file(schema_file, os.path.join(output_dir, 'schema.json')) -def run_dataflow(feature_set, column_names, input_file_path, train_percent, - eval_percent, test_percent, output_dir, cloud, project_id, - job_name): +def run_dataflow(feature_set, column_names, input_file_path, schema_file, + train_percent, eval_percent, test_percent, output_dir, cloud, + project_id, job_name): """Run Preprocessing as a Dataflow pipeline.""" # Configure the pipeline. @@ -233,6 +256,7 @@ def run_dataflow(feature_set, column_names, input_file_path, train_percent, feature_set=feature_set, column_names=column_names, input_file_path=input_file_path, + schema_file=schema_file, train_percent=train_percent, eval_percent=eval_percent, test_percent=test_percent, @@ -251,6 +275,7 @@ def main(argv=None): feature_set=feature_set, column_names=column_names, input_file_path=args.input_file_path, + schema_file=args.schema_file, train_percent=args.train_percent, eval_percent=args.eval_percent, test_percent=args.test_percent, diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/setup.py b/solutionbox/structured_data/datalab_solutions/structured_data/setup.py index 79da62db0..1ea33dc13 100644 --- a/solutionbox/structured_data/datalab_solutions/structured_data/setup.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/setup.py @@ -24,4 +24,6 @@ version=VERSION, packages=['trainer', 'preprocess'], author='Google', - author_email='cloudml-feedback@google.com',) + author_email='cloudml-feedback@google.com', + test_suite='nose.collector', + tests_require=['nose']) diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/test/e2e_functions.py b/solutionbox/structured_data/datalab_solutions/structured_data/test/e2e_functions.py index 5a9aaae9d..724bdad0a 100644 --- a/solutionbox/structured_data/datalab_solutions/structured_data/test/e2e_functions.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/test/e2e_functions.py @@ -13,9 +13,11 @@ # limitations under the License. # ============================================================================== + +import os import random import subprocess -import os + def make_csv_data(filename, num_rows, problem_type): random.seed(12321) @@ -29,9 +31,9 @@ def make_csv_data(filename, num_rows, problem_type): str2 = random.choice(['abc', 'def', 'ghi', 'jkl', 'mno', 'pqr']) str3 = random.choice(['car', 'truck', 'van', 'bike', 'train', 'drone']) - map1 = {'red':2, 'blue':6, 'green':4, 'pink':-5, 'yellow':-6, 'brown':-1, 'black':7} - map2 = {'abc':10, 'def':1, 'ghi':1, 'jkl':1, 'mno':1, 'pqr':1} - map3 = {'car':5, 'truck':10, 'van':15, 'bike':20, 'train':25, 'drone': 30} + map1 = {'red': 2, 'blue': 6, 'green': 4, 'pink': -5, 'yellow': -6, 'brown': -1, 'black': 7} + map2 = {'abc': 10, 'def': 1, 'ghi': 1, 'jkl': 1, 'mno': 1, 'pqr': 1} + map3 = {'car': 5, 'truck': 10, 'van': 15, 'bike': 20, 'train': 25, 'drone': 30} # Build some model. t = 0.5 + 0.5*num1 -2.5*num2 + num3 @@ -56,52 +58,72 @@ def make_csv_data(filename, num_rows, problem_type): str3=str3) f1.write(csv_line) - config = {'column_names': ['key', 'target', 'num1', 'num2', 'num3', + schema = {'column_names': ['key', 'target', 'num1', 'num2', 'num3', 'str1', 'str2', 'str3'], 'key_column': 'key', 'target_column': 'target', - 'problem_type': problem_type, - 'model_type': '', - 'numerical': {'num1': {'transform': 'identity'}, - 'num2': {'transform': 'identity'}, - 'num3': {'transform': 'identity'}}, - 'categorical': {'str1': {'transform': 'one_hot'}, - 'str2': {'transform': 'one_hot'}, - 'str3': {'transform': 'one_hot'}} + 'numerical_columns': ['num1', 'num2', 'num3'], + 'categorical_columns': ['str1', 'str2', 'str3'] } - return config - + if problem_type == 'classification': + schema['categorical_columns'] += ['target'] + else: + schema['numerical_columns'] += ['target'] + # use defaults for num3 and str3 + transforms = {'num1': {'transform': 'identity'}, + 'num2': {'transform': 'identity'}, + # 'num3': {'transform': 'identity'}, + 'str1': {'transform': 'one_hot'}, + 'str2': {'transform': 'one_hot'}, + # 'str3': {'transform': 'one_hot'} + } + return schema, transforms -def run_preprocess(output_dir, csv_filename, config_filename, +def run_preprocess(output_dir, csv_filename, schema_filename, train_percent='80', eval_percent='10', test_percent='10'): - cmd = ['python', './preprocess/preprocess.py', + preprocess_script = os.path.abspath( + os.path.join(os.path.dirname(__file__), '../preprocess/preprocess.py')) + cmd = ['python', preprocess_script, '--output_dir', output_dir, - '--input_file_path', csv_filename, - '--transforms_config_file', config_filename, + '--input_file_path', csv_filename, + '--schema_file', schema_filename, '--train_percent', train_percent, '--eval_percent', eval_percent, '--test_percent', test_percent, ] - print('Current working directoyr: %s' % os.getcwd()) print('Going to run command: %s' % ' '.join(cmd)) subprocess.check_call(cmd, stderr=open(os.devnull, 'wb')) -def run_training(output_dir, input_dir, config_filename, extra_args=[]): - """Runs Training via gcloud alpha ml local train. + +def run_training(output_dir, input_dir, schema_filename, transforms_filename, + max_steps, extra_args=[]): + """Runs Training via gcloud beta ml local train. Args: output_dir: the trainer's output folder - input_folder: should contain features_train*, features_eval*, and + input_dir: should contain features_train*, features_eval*, and mmetadata.json. - config_filename: path to the config file + schema_filename: path to the schema file + transforms_filename: path to the transforms file. + max_steps: int. max training steps. extra_args: array of strings, passed to the trainer. + + Returns: + The stderr of training as one string. TF writes to stderr, so basically, the + output of training. """ train_filename = os.path.join(input_dir, 'features_train*') eval_filename = os.path.join(input_dir, 'features_eval*') metadata_filename = os.path.join(input_dir, 'metadata.json') - cmd = ['gcloud alpha ml local train', + + # Gcloud has the fun bug that you have to be in the parent folder of task.py + # when you call it. So cd there first. + task_parent_folder = os.path.abspath( + os.path.join(os.path.dirname(__file__), '..')) + cmd = ['cd %s &&' % task_parent_folder, + 'gcloud beta ml local train', '--module-name=trainer.task', '--package-path=trainer', '--', @@ -109,14 +131,10 @@ def run_training(output_dir, input_dir, config_filename, extra_args=[]): '--eval_data_paths=%s' % eval_filename, '--metadata_path=%s' % metadata_filename, '--output_path=%s' % output_dir, - '--transforms_config_file=%s' % config_filename, - '--max_steps=2500'] + extra_args - print('Current working directoyr: %s' % os.getcwd()) + '--schema_file=%s' % schema_filename, + '--transforms_file=%s' % transforms_filename, + '--max_steps=%s' % max_steps] + extra_args print('Going to run command: %s' % ' '.join(cmd)) - sp = subprocess.Popen(' '.join(cmd), shell=True, stderr=subprocess.PIPE) #open(os.devnull, 'wb')) + sp = subprocess.Popen(' '.join(cmd), shell=True, stderr=subprocess.PIPE) _, err = sp.communicate() - err = err.splitlines() - print 'last line' - print err[len(err)-1] - - stderr=subprocess.PIPE + return err \ No newline at end of file diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/test/test_preprocess.py b/solutionbox/structured_data/datalab_solutions/structured_data/test/test_preprocess.py index fd78e319a..ed4341f7d 100644 --- a/solutionbox/structured_data/datalab_solutions/structured_data/test/test_preprocess.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/test/test_preprocess.py @@ -13,14 +13,15 @@ # limitations under the License. # ============================================================================== -import unittest -import tempfile -import os -import sys -import json import glob -import subprocess +import json +import os import shutil +import subprocess +import tempfile +import unittest + +import tensorflow as tf import google.cloud.ml as ml @@ -28,59 +29,86 @@ class TestPreprocess(unittest.TestCase): + def setUp(self): self._test_dir = tempfile.mkdtemp() self._csv_filename = os.path.join(self._test_dir, 'raw_csv_data.csv') - self._config_filename = os.path.join(self._test_dir, 'config.json') + self._schema_filename = os.path.join(self._test_dir, 'schema.json') def tearDown(self): + print('TestPreprocess: removing test dir: ' + self._test_dir) shutil.rmtree(self._test_dir) def testRegression(self): - config = e2e_functions.make_csv_data(self._csv_filename, 100, 'regression') - config['categorical']['str1']['transform'] = 'embedding' - config['categorical']['str1']['dimension'] = '3' + (schema, _) = e2e_functions.make_csv_data(self._csv_filename, 100, + 'regression') + + with open(self._schema_filename, 'w') as f: + f.write(json.dumps(schema, indent=2, separators=(',', ': '))) - with open(self._config_filename, 'w') as f: - f.write(json.dumps(config, indent=2, separators=(',', ': '))) - - e2e_functions.run_preprocess(self._test_dir, self._csv_filename, self._config_filename) + e2e_functions.run_preprocess(output_dir=self._test_dir, + csv_filename=self._csv_filename, + schema_filename=self._schema_filename) metadata_path = os.path.join(self._test_dir, 'metadata.json') metadata = ml.features.FeatureMetadata.get_metadata(metadata_path) expected_features = { - 'num1': {'dtype': 'float', 'type': 'dense', 'name': 'num1', - 'columns': ['num1'], 'size': 1}, - 'num2': {'dtype': 'float', 'type': 'dense', 'name': 'num2', - 'columns': ['num2'], 'size': 1}, - 'num3': {'dtype': 'float', 'type': 'dense', 'name': 'num3', - 'columns': ['num3'], 'size': 1}, - 'str3': {'dtype': 'int64', 'type': 'sparse', 'name': 'str3', - 'columns': ['str3'], 'size': 7}, - 'str2': {'dtype': 'int64', 'type': 'sparse', 'name': 'str2', - 'columns': ['str2'], 'size': 7}, - 'str1': {'dtype': 'int64', 'type': 'sparse', 'name': 'str1', - 'columns': ['str1'], 'size': 8}, - 'key': {'dtype': 'bytes', 'type': 'dense', 'name': 'key', - 'columns': ['key'], 'size': 1}, - 'target': {'dtype': 'float', 'type': 'dense', 'name': 'target', + 'num1': {'dtype': 'float', 'type': 'dense', 'name': 'num1', + 'columns': ['num1'], 'size': 1}, + 'num2': {'dtype': 'float', 'type': 'dense', 'name': 'num2', + 'columns': ['num2'], 'size': 1}, + 'num3': {'dtype': 'float', 'type': 'dense', 'name': 'num3', + 'columns': ['num3'], 'size': 1}, + 'str3': {'dtype': 'int64', 'type': 'sparse', 'name': 'str3', + 'columns': ['str3'], 'size': 7}, + 'str2': {'dtype': 'int64', 'type': 'sparse', 'name': 'str2', + 'columns': ['str2'], 'size': 7}, + 'str1': {'dtype': 'int64', 'type': 'sparse', 'name': 'str1', + 'columns': ['str1'], 'size': 8}, + 'key': {'dtype': 'bytes', 'type': 'dense', 'name': 'key', + 'columns': ['key'], 'size': 1}, + 'target': {'dtype': 'float', 'type': 'dense', 'name': 'target', 'columns': ['target'], 'size': 1}} self.assertEqual(metadata.features, expected_features) self.assertEqual(metadata.columns['target']['scenario'], 'continuous') - self.assertTrue(glob.glob(os.path.join(self._test_dir, 'features_train*'))) - + train_files = glob.glob(os.path.join(self._test_dir, 'features_train*')) + self.assertTrue(train_files) + self.assertTrue(os.path.isfile(os.path.join(self._test_dir, 'schema.json'))) + + # Inspect the first TF record. + for line in tf.python_io.tf_record_iterator(train_files[0], + options=tf.python_io.TFRecordOptions( + tf.python_io.TFRecordCompressionType.GZIP)): + ex = tf.train.Example() + ex.ParseFromString(line) + self.assertTrue('num1' in ex.features.feature) + self.assertTrue('num2' in ex.features.feature) + self.assertTrue('num3' in ex.features.feature) + self.assertTrue('key' in ex.features.feature) + self.assertTrue('target' in ex.features.feature) + self.assertTrue('str1@0' in ex.features.feature) + self.assertTrue('str1@1' in ex.features.feature) + self.assertTrue('str2@0' in ex.features.feature) + self.assertTrue('str2@1' in ex.features.feature) + self.assertTrue('str3@0' in ex.features.feature) + self.assertTrue('str3@1' in ex.features.feature) + break def testClassification(self): - config = e2e_functions.make_csv_data(self._csv_filename, 100, 'classification') - - with open(self._config_filename, 'w') as f: - f.write(json.dumps(config, indent=2, separators=(',', ': '))) - - e2e_functions.run_preprocess(self._test_dir, self._csv_filename, self._config_filename, - '90', '10', '0') + (schema, _) = e2e_functions.make_csv_data(self._csv_filename, 100, + 'classification') + with open(self._schema_filename, 'w') as f: + f.write(json.dumps(schema, indent=2, separators=(',', ': '))) + + e2e_functions.run_preprocess(output_dir=self._test_dir, + csv_filename=self._csv_filename, + schema_filename=self._schema_filename, + train_percent='90', + eval_percent='10', + test_percent='0') metadata_path = os.path.join(self._test_dir, 'metadata.json') metadata = ml.features.FeatureMetadata.get_metadata(metadata_path) @@ -89,3 +117,6 @@ def testClassification(self): self.assertTrue(glob.glob(os.path.join(self._test_dir, 'features_train*'))) self.assertTrue(glob.glob(os.path.join(self._test_dir, 'features_eval*'))) self.assertFalse(glob.glob(os.path.join(self._test_dir, 'features_test*'))) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/test/test_trainer.py b/solutionbox/structured_data/datalab_solutions/structured_data/test/test_trainer.py index fefc2e539..dcff0e9b2 100644 --- a/solutionbox/structured_data/datalab_solutions/structured_data/test/test_trainer.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/test/test_trainer.py @@ -12,22 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== - -import unittest -import tempfile -import os -import sys import json -import glob -import subprocess +import os +import re import shutil - -import google.cloud.ml as ml +import tempfile +import unittest import e2e_functions class TestTrainer(unittest.TestCase): + def setUp(self): self._test_dir = tempfile.mkdtemp() self._preprocess_dir = os.path.join(self._test_dir, 'pre') @@ -36,20 +32,85 @@ def setUp(self): os.mkdir(self._preprocess_dir) os.mkdir(self._train_dir) - self._config_filename = os.path.join(self._preprocess_dir, 'config.json') self._csv_filename = os.path.join(self._preprocess_dir, 'raw_csv_data.csv') + self._schema_filename = os.path.join(self._test_dir, 'schema.json') + self._transforms_filename = os.path.join(self._test_dir, 'transforms.json') - def tearDown(self): + print('TestTrainer: removing test dir ' + self._test_dir) shutil.rmtree(self._test_dir) + def _run_training(self, schema, transforms, extra_args): + with open(self._schema_filename, 'w') as f: + f.write(json.dumps(schema, indent=2, separators=(',', ': '))) + + with open(self._transforms_filename, 'w') as f: + f.write(json.dumps(transforms, indent=2, separators=(',', ': '))) + + + e2e_functions.run_preprocess(output_dir=self._preprocess_dir, + csv_filename=self._csv_filename, + schema_filename=self._schema_filename) + output = e2e_functions.run_training(output_dir=self._train_dir, + input_dir=self._preprocess_dir, + schema_filename=self._schema_filename, + transforms_filename=self._transforms_filename, + max_steps=2500, + extra_args=extra_args) + self._training_screen_output = output + + def _check_training_screen_output(self, accuracy=None, loss=None): + """Should be called after _run_training. + + Inspects self._training_screen_output for correct output. + + Args: + eval_metrics: dict in the form {key: expected_number}. Will inspect the + last line of the training output for the line "KEY = NUMBER" and + check that NUMBER < expected_number. + """ + # Print the last line of training output which has the loss value. + lines = self._training_screen_output.splitlines() + last_line = lines[len(lines)-1] + print(last_line) + + # supports positive numbers (int, real) with exponential form support. + positive_number_re = re.compile('[+]?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?') + + # Check it made it to step 2500 + saving_num_re = re.compile('Saving evaluation summary for step \d+') + saving_num = saving_num_re.findall(last_line) + # saving_num == ['Saving evaluation summary for step NUM'] + self.assertEqual(len(saving_num), 1) + step_num = positive_number_re.findall(saving_num[0]) + # step_num == ['2500'] + self.assertEqual(len(step_num), 1) + self.assertEqual(int(step_num[0]), 2500) + + + # Check the accuracy + if accuracy is not None: + accuracy_eq_num_re = re.compile('accuracy = [+]?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?') + accuracy_eq_num = accuracy_eq_num_re.findall(last_line) + # accuracy_eq_num == ['accuracy = NUM'] + self.assertEqual(len(accuracy_eq_num), 1) + accuracy_num = positive_number_re.findall(accuracy_eq_num[0]) + # accuracy_num == ['X.XXX'] + self.assertEqual(len(accuracy_num), 1) + self.assertGreater(float(accuracy_num[0]), accuracy) + + if loss is not None: + loss_eq_num_re = re.compile('loss = [+]?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?') + loss_eq_num = loss_eq_num_re.findall(last_line) + # loss_eq_num == ['loss = NUM'] + self.assertEqual(len(loss_eq_num), 1) + loss_num = positive_number_re.findall(loss_eq_num[0]) + # loss_num == ['X.XXX'] + self.assertEqual(len(loss_num), 1) + self.assertLess(float(loss_num[0]), loss) + + - def _run_training(self, config): - with open(self._config_filename, 'w') as f: - f.write(json.dumps(config, indent=2, separators=(',', ': '))) - - e2e_functions.run_preprocess(self._preprocess_dir, self._csv_filename, self._config_filename) - e2e_functions.run_training(self._train_dir, self._preprocess_dir, self._config_filename, ['--layer_sizes 20 10 5']) def _check_train_files(self): model_folder = os.path.join(self._train_dir, 'model') @@ -57,45 +118,53 @@ def _check_train_files(self): self.assertTrue(os.path.isfile(os.path.join(model_folder, 'export'))) self.assertTrue(os.path.isfile(os.path.join(model_folder, 'export.meta'))) self.assertTrue(os.path.isfile(os.path.join(model_folder, 'metadata.json'))) - + self.assertTrue(os.path.isfile(os.path.join(model_folder, 'schema.json'))) + self.assertTrue(os.path.isfile(os.path.join(model_folder, 'transforms.json'))) def testRegressionDnn(self): print('\n\nTesting Regression DNN') - config = e2e_functions.make_csv_data(self._csv_filename, 5000, 'regression') - config['categorical']['str1']['transform'] = 'embedding' - config['categorical']['str1']['dimension'] = '3' - config['model_type'] = 'dnn' + (schema, transforms) = e2e_functions.make_csv_data(self._csv_filename, 5000, + 'regression') + transforms['str1']['transform'] = 'embedding' + transforms['str1']['dimension'] = '3' - self._run_training(config) - self._check_train_files() + flags = ['--layer_sizes 10 10 5', + '--model_type=dnn_regression'] + self._run_training(schema, transforms, flags) + self._check_training_screen_output(loss=10) + self._check_train_files() def testRegressionLinear(self): print('\n\nTesting Regression Linear') - config = e2e_functions.make_csv_data(self._csv_filename, 5000, 'regression') - config['model_type'] = 'linear' + (schema, transforms) = e2e_functions.make_csv_data(self._csv_filename, 5000, + 'regression') + flags = ['--model_type=linear_regression'] - self._run_training(config) + self._run_training(schema, transforms, flags) + self._check_training_screen_output(loss=1) self._check_train_files() - def testClassificationDnn(self): print('\n\nTesting classification DNN') - config = e2e_functions.make_csv_data(self._csv_filename, 5000, - 'classification') - config['categorical']['str1']['transform'] = 'embedding' - config['categorical']['str1']['dimension'] = '3' - config['model_type'] = 'dnn' + (schema, transforms) = e2e_functions.make_csv_data(self._csv_filename, 5000, + 'classification') + transforms['str1']['transform'] = 'embedding' + transforms['str1']['dimension'] = '3' - self._run_training(config) - self._check_train_files() + flags = ['--layer_sizes 10 10 5', + '--model_type=dnn_classification'] + self._run_training(schema, transforms, flags) + self._check_training_screen_output(accuracy=0.95, loss=0.09) + self._check_train_files() def testClassificationLinear(self): print('\n\nTesting classification Linear') - config = e2e_functions.make_csv_data(self._csv_filename, 5000, - 'classification') - config['model_type'] = 'linear' + (schema, transforms) = e2e_functions.make_csv_data(self._csv_filename, 5000, + 'classification') + flags = ['--model_type=linear_classification'] - self._run_training(config) - self._check_train_files() \ No newline at end of file + self._run_training(schema, transforms, flags) + self._check_training_screen_output(accuracy=0.95, loss=0.15) + self._check_train_files() diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/trainer/task.py b/solutionbox/structured_data/datalab_solutions/structured_data/trainer/task.py index 91edb45a3..1cfedd66b 100755 --- a/solutionbox/structured_data/datalab_solutions/structured_data/trainer/task.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/trainer/task.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +# TODO(brnaondutra): raise some excpetion vs print/sys.exit. from __future__ import absolute_import from __future__ import division @@ -45,7 +46,19 @@ OUTPUT_COLLECTION_NAME = 'outputs' -def get_placeholder_input_fn(metadata, transform_config): +def is_linear_model(model_type): + return model_type.startswith('linear_') + +def is_dnn_model(model_type): + return model_type.startswith('dnn_') + +def is_regression_model(model_type): + return model_type.endswith('_regression') + +def is_classification_model(model_type): + return model_type.endswith('_classification') + +def get_placeholder_input_fn(metadata, schema_config): """Input layer for the exported graph.""" def get_input_features(): @@ -59,9 +72,9 @@ def get_input_features(): # keep_target=False) features = util.parse_example_tensor(examples=examples, - mode='prediction', - metadata=metadata, - transform_config=transform_config) + mode='prediction', + metadata=metadata, + schema_config=schema_config) if FEATURES_EXAMPLE_DICT_KEY in features: print('ERROR: %s is a reserved feature name, please use a different' @@ -78,7 +91,7 @@ def get_input_features(): return get_input_features -def get_reader_input_fn(metadata, transform_config, data_paths, batch_size, +def get_reader_input_fn(metadata, schema_config, data_paths, batch_size, shuffle): """Builds input layer for training.""" @@ -86,12 +99,12 @@ def get_input_features(): """Read the input features from the given data paths.""" _, examples = util.read_examples(data_paths, batch_size, shuffle) features = util.parse_example_tensor(examples=examples, - mode='training', - metadata=metadata, - transform_config=transform_config) + mode='training', + metadata=metadata, + schema_config=schema_config) # Retrieve the target feature column. - target_name = transform_config['target_column'] + target_name = schema_config['target_column'] target = features.pop(target_name) return features, target @@ -100,7 +113,7 @@ def get_input_features(): def get_vocabulary(class_index_dict): """Get vocab from metadata file. - + THe dict's keys are sorted by the values. Example: @@ -113,8 +126,7 @@ def get_vocabulary(class_index_dict): return [class_name for (class_name, class_index) in sorted(class_index_dict.iteritems(), key=lambda (class_name, class_index): class_index)] - -def get_export_signature_fn(metadata, transform_config): +def get_export_signature_fn(metadata, schema_config, args): """Builds the output layer in the exported graph. Also sets up the tensor names when calling session.run @@ -122,12 +134,12 @@ def get_export_signature_fn(metadata, transform_config): def get_export_signature(examples, features, predictions): """Create an export signature with named input and output signatures.""" - target_name = transform_config['target_column'] - key_name = transform_config['key_column'] + target_name = schema_config['target_column'] + key_name = schema_config['key_column'] outputs = {TARGET_SCORE_TENSOR_NAME: predictions.name, key_name: tf.squeeze(features[key_name]).name} - if transform_config['problem_type'] == 'classification': + if is_classification_model(args.model_type): target_labels = get_vocabulary(metadata.columns[target_name]['vocab']) prediction = tf.argmax(predictions, 1) labels = tf.contrib.lookup.index_to_string( @@ -158,72 +170,91 @@ def get_export_signature(examples, features, predictions): return get_export_signature -def get_estimator(output_dir, feature_columns, metadata, transform_config, - args): +def get_estimator(output_dir, metadata, transform_config, schema_config, args): """Returns a tf learn estimator. We only support {DNN, Linear}Regressor and {DNN, Linear}Classifier. This is - controlled by the values of problem_type and model_type in the config file. + controlled by the values of model_type in the args. Args: output_dir: Modes are saved into outputdir/train - feature_columns: dict of tf layers feature columns - metadata: metadata.json object - transform_config: transforms config object args: parseargs object """ - train_dir = os.path.join(output_dir, 'train') - problem_type = transform_config['problem_type'] - if problem_type != 'regression' and problem_type != 'classification': - print('ERROR: problem_type from the transform file should be regression' - ' or classification.') + # Check the requested mode fits the preprocessed data. + target_name = schema_config['target_column'] + if (is_classification_model(args.model_type) + and target_name not in schema_config.get('categorical_columns', [])): + print('ERROR: when using a classification model, the target must be a ' + 'categorical variable.') + sys.exit(1) + if (is_regression_model(args.model_type) + and target_name not in schema_config.get('numerical_columns', [])): + print('ERROR: when using a regression model, the target must be a ' + 'numerical variable.') sys.exit(1) - model_type = transform_config['model_type'] - - if model_type != 'dnn' and model_type != 'linear': - print('ERROR: model_type from the transform file should be dnn or linear') + # Check layers used for dnn models. + if is_dnn_model(args.model_type) and not args.layer_sizes: + print('ERROR: --layer_sizes must be used with DNN models') sys.exit(1) + elif is_linear_model(args.model_type) and args.layer_sizes: + print('ERROR: --layer_sizes cannot be used with linear models') + sys.exit(1) + + # Build tf.learn features + feature_columns = util.produce_feature_columns( + metadata, transform_config, schema_config, + is_linear_model(args.model_type)) + feature_engineering_fn = util.produce_feature_engineering_fn(metadata, + transform_config, schema_config) # Set how often to run checkpointing in terms of time. config = tf.contrib.learn.RunConfig( save_checkpoints_secs=args.save_checkpoints_secs) - # TODO(brandondutra) check layer_sizes pass in if needed. - - if problem_type == 'regression' and model_type == 'dnn': + train_dir = os.path.join(output_dir, 'train') + if args.model_type == 'dnn_regression': estimator = tf.contrib.learn.DNNRegressor( feature_columns=feature_columns, hidden_units=args.layer_sizes, config=config, model_dir=train_dir, + feature_engineering_fn=feature_engineering_fn, optimizer=tf.train.AdamOptimizer( args.learning_rate, epsilon=args.epsilon)) - elif problem_type == 'regression' and model_type == 'linear': + elif args.model_type == 'linear_regression': estimator = tf.contrib.learn.LinearRegressor( feature_columns=feature_columns, config=config, model_dir=train_dir, + feature_engineering_fn=feature_engineering_fn, optimizer=tf.train.AdamOptimizer( args.learning_rate, epsilon=args.epsilon)) - elif problem_type == 'classification' and model_type == 'dnn': + elif args.model_type == 'dnn_classification': + n_classes = max(metadata.columns[target_name]['vocab'].values()) + 1 estimator = tf.contrib.learn.DNNClassifier( feature_columns=feature_columns, hidden_units=args.layer_sizes, - n_classes=metadata.stats['labels'], + n_classes=n_classes, config=config, model_dir=train_dir, + feature_engineering_fn=feature_engineering_fn, optimizer=tf.train.AdamOptimizer( args.learning_rate, epsilon=args.epsilon)) - elif problem_type == 'classification' and model_type == 'linear': + elif args.model_type == 'linear_classification': + n_classes = max(metadata.columns[target_name]['vocab'].values()) + 1 estimator = tf.contrib.learn.LinearClassifier( feature_columns=feature_columns, - n_classes=metadata.stats['labels'], + n_classes=n_classes, config=config, model_dir=train_dir, + feature_engineering_fn=feature_engineering_fn, optimizer=tf.train.AdamOptimizer( args.learning_rate, epsilon=args.epsilon)) + else: + print('ERROR: bad --model_type value') + sys.exit(1) return estimator @@ -232,43 +263,34 @@ def get_experiment_fn(args): """Builds the experiment function for learn_runner.run""" def get_experiment(output_dir): - - # Load the metadata. - metadata = ml.features.FeatureMetadata.get_metadata( - args.metadata_path) - - # Load the config file. - transform_config = json.loads( - ml.util._file.load_file(args.transforms_config_file)) - - # Build tf.learn features - feature_columns = util.produce_feature_columns(metadata, transform_config) + # Load the metadata file, transforms file, and schema file. + metadata = ml.features.FeatureMetadata.get_metadata(args.metadata_path) + transform_config = json.loads(ml.util._file.load_file(args.transforms_file)) + schema_config = json.loads(ml.util._file.load_file(args.schema_file)) # Get the model to train. - estimator = get_estimator(output_dir, feature_columns, metadata, - transform_config, args) + estimator = get_estimator(output_dir, metadata, transform_config, schema_config, args) - input_placeholder_for_prediction = get_placeholder_input_fn(metadata, - transform_config) + input_placeholder_for_prediction = get_placeholder_input_fn(metadata, + schema_config) # Save the finished model to output_dir/model export_monitor = util.ExportLastModelMonitor( output_dir=output_dir, final_model_location='model', # Relative to the output_dir. - additional_assets=[args.metadata_path], + additional_assets=[args.metadata_path, args.schema_file, args.transforms_file], input_fn=input_placeholder_for_prediction, input_feature_key=FEATURES_EXAMPLE_DICT_KEY, - signature_fn=get_export_signature_fn(metadata, transform_config)) + signature_fn=get_export_signature_fn(metadata, schema_config, args)) - target_name = transform_config['target_column'] input_reader_for_train = get_reader_input_fn( - metadata, transform_config, args.train_data_paths, args.batch_size, shuffle=True) + metadata, schema_config, args.train_data_paths, args.batch_size, shuffle=True) input_reader_for_eval = get_reader_input_fn( - metadata, transform_config, args.eval_data_paths, args.eval_batch_size, shuffle=False) + metadata, schema_config, args.eval_data_paths, args.eval_batch_size, shuffle=False) # Set the eval metrics. # todo(brandondutra): make this work with HP tuning. - if transform_config['problem_type'] == 'classification': + if is_classification_model(args.model_type): streaming_accuracy = metrics_lib.streaming_accuracy eval_metrics = { ('accuracy', 'classes'): streaming_accuracy, @@ -302,16 +324,26 @@ def parse_arguments(argv): required=True) parser.add_argument('--metadata_path', type=str, required=True) parser.add_argument('--output_path', type=str, required=True) - parser.add_argument('--transforms_config_file', + parser.add_argument('--schema_file', + type=str, + required=True, + help=('File describing the schema of each column used ' + 'during preprocessing.')) + parser.add_argument('--transforms_file', type=str, required=True, - help=('File describing the schema and transforms of ' - 'each column in the csv data files.')) + help=('File describing the the transforms to apply on ' + 'each column')) # HP parameters parser.add_argument('--learning_rate', type=float, default=0.01) parser.add_argument('--epsilon', type=float, default=0.0005) - + + # Model problems + parser.add_argument('--model_type', + choices=['linear_classification', 'linear_regression', + 'dnn_classification', 'dnn_regression'], + required=True) # Training input parameters parser.add_argument('--layer_sizes', type=int, nargs='*') parser.add_argument('--max_steps', type=int, default=5000, diff --git a/solutionbox/structured_data/datalab_solutions/structured_data/trainer/util.py b/solutionbox/structured_data/datalab_solutions/structured_data/trainer/util.py index a2ce761fb..74edfcd5e 100755 --- a/solutionbox/structured_data/datalab_solutions/structured_data/trainer/util.py +++ b/solutionbox/structured_data/datalab_solutions/structured_data/trainer/util.py @@ -19,7 +19,7 @@ import tensorflow as tf from tensorflow.python.lib.io import file_io - +import google.cloud.ml as ml def _copy_all(src_files, dest_dir): # file_io.copy does not copy files into folders directly. @@ -140,9 +140,7 @@ def read_examples(input_files, batch_size, shuffle, num_epochs=None): num_threads=thread_count) - - -def produce_feature_columns(metadata, config): +def produce_feature_columns(metadata, transform_config, schema_config, is_linear_model): """Produces a list of Tensorflow columns. Args: @@ -154,86 +152,139 @@ def produce_feature_columns(metadata, config): """ feature_columns = [] + target_column = schema_config['target_column'] + key_column = schema_config['key_column'] # Extract the numerical features. - if 'numerical' in config: - for name, transform_config in config['numerical'].iteritems(): - # There is no other TF transfroms for numerical columns. - feature_columns.append( - tf.contrib.layers.real_valued_column( - name, - dimension=metadata.features[name]['size'])) - # TODO(brandondutra) allow real_valued vectors? For now force only scalars. - assert 1 == metadata.features[name]['size'] - - # Extrace the categorical features - if 'categorical' in config: - for name, transform_config in config['categorical'].iteritems(): - transform = transform_config['transform'] - if config['model_type'] == 'linear' and transform != 'one_hot': - print('ERROR: only one_hot transfroms are supported in linear models') - sys.exit(1) - - if transform == 'one_hot': - # Preprocessing built a vocab using 0, 1, ..., N as the new classes. - N = metadata.features[name]['size'] - sparse_column = tf.contrib.layers.sparse_column_with_integerized_feature( - column_name=name, - bucket_size=N+1) - if config['model_type'] == 'linear': - feature_columns.append(sparse_column) - else: - feature_columns.append( - tf.contrib.layers.one_hot_column(sparse_column)) - elif transform == 'embedding': - # Preprocessing built a vocab using 0, 1, ..., N as the new classes. - N = metadata.features[name]['size'] - dim = transform_config['dimension'] - sparse_column = tf.contrib.layers.sparse_column_with_integerized_feature( + for name in schema_config.get('numerical_columns', []): + if name == key_column or name == target_column: + continue + # Numerical transforms happen in produce_feature_engineering_fn + feature_columns.append( + tf.contrib.layers.real_valued_column( + name, + dimension=metadata.features[name]['size'])) + # TODO(brandondutra) allow real_valued vectors? For now force only scalars. + assert 1 == metadata.features[name]['size'] + + # Extract the categorical features + for name in schema_config.get('categorical_columns', []): + if name == key_column or name == target_column: + continue + transform_dict = transform_config.get(name, {}) + transform_type = transform_dict.get('transform', 'one_hot') + + if transform_type == 'one_hot': + # Preprocessing built a vocab using 0, 1, ..., N as the new classes. N + # means 'unknown/missing'. + N = metadata.features[name]['size'] + sparse_column = tf.contrib.layers.sparse_column_with_integerized_feature( + column_name=name, + bucket_size=N+1) + if is_linear_model: + feature_columns.append(sparse_column) + else: + feature_columns.append(tf.contrib.layers.one_hot_column(sparse_column)) + + elif transform_type == 'embedding': + # Preprocessing built a vocab using 0, 1, ..., N as the new classes. + N = metadata.features[name]['size'] + dim = transform_dict['dimension'] + sparse_column = tf.contrib.layers.sparse_column_with_integerized_feature( column_name=name, bucket_size=N+1) - feature_columns.append( - tf.contrib.layers.embedding_column(sparse_column, dim)) - else: - print('ERROR: unkown categorical transform name %s in %s' % (transform, str(transform_config))) - sys.exit(1) + feature_columns.append(tf.contrib.layers.embedding_column(sparse_column, + dim)) + # TODO(brandon): check model type is dnn. + else: + print('ERROR: unkown categorical transform name %s in %s' % + (name, str(transform_type))) + sys.exit(1) return feature_columns +def _scale_tensor(tensor, range_min, range_max, scale_min, scale_max): + if range_min == range_max: + return tensor -def parse_example_tensor(examples, mode, metadata, transform_config): - - dtype_mapping = { - 'bytes': tf.string, - 'float': tf.float32, - 'int64': tf.int64 - } - - example_schema = {} - if 'numerical' in transform_config: - for name, _ in transform_config['numerical'].iteritems(): - size = 1 #metadata.features[name]['size'] - dtype = dtype_mapping[metadata.features[name]['dtype']] - example_schema[name] = tf.FixedLenFeature(shape=[size], dtype=dtype) - - if 'categorical' in transform_config: - for name, _ in transform_config['categorical'].iteritems(): - size = 1 #metadata.features[name]['size'] - dtype = dtype_mapping[metadata.features[name]['dtype']] - example_schema[name] = tf.FixedLenFeature(shape=[size], dtype=dtype) - - - if mode == 'training': - target_name = transform_config['target_column'] - size = 1 #metadata.features[target_name]['size'] - dtype = dtype_mapping[metadata.features[target_name]['dtype']] - example_schema[target_name] = tf.FixedLenFeature(shape=[size], dtype=dtype) - elif mode == 'prediction': - key_name = transform_config['key_column'] - size = 1 #metadata.features[key_name]['size'] - dtype = dtype_mapping[metadata.features[key_name]['dtype']] - example_schema[key_name] = tf.FixedLenFeature(shape=[size], dtype=dtype) - else: - print('ERROR: unknown mode type') - sys.exit(1) + float_tensor = tf.to_float(tensor) + scaled_tensor = tf.div( + tf.sub(float_tensor, range_min) * tf.constant(float(scale_max - scale_min)), + tf.constant(float(range_max - range_min))) + shifted_tensor = scaled_tensor + tf.constant(float(scale_min)) + + return shifted_tensor + + +def produce_feature_engineering_fn(metadata, transform_config, schema_config): + """Makes a feature_engineering_fn for transforming the numerical types. + + This is called with the output of the 'input_fn' function, and the output of + this function is given to tf.learn to further process. This function extracts + the ids tensors from ml.features.FeatureMetadata.parse_features and throws + away the values tensor. + """ + + def _feature_engineering_fn(features, target): + target_column = schema_config['target_column'] + key_column = schema_config['key_column'] + + with tf.name_scope('numerical_feature_engineering') as scope: + new_features = {} + for name in schema_config.get('numerical_columns', []): + if name == key_column or name == target_column: + continue + transform_dict = transform_config.get(name, {}) + transform_type = transform_dict.get('transform', 'identity') + if transform_type == 'scale': + range_min = metadata.columns[name]['min'] + range_max = metadata.columns[name]['max'] + new_features[name] = _scale_tensor(features[name], + range_min=range_min, + range_max=range_max, + scale_min=-1, + scale_max=1) + elif transform_type == 'max_abs_scale': + value = transform_dict['value'] + range_min = metadata.columns[name]['min'] + range_max = metadata.columns[name]['max'] + new_features[name] = _scale_tensor(features[name], + range_min=range_min, + range_max=range_max, + scale_min=-value, + scale_max=value) + + elif transform_type == 'identity': + # Don't need to do anything + pass + else: + print('ERROR: Unknown numerical transform %s for feature %s' % + (transform_type, name)) + sys.exit(1) + features.update(new_features) + return features, target + + return _feature_engineering_fn + + +def parse_example_tensor(examples, mode, metadata, schema_config): + if mode == 'training': + features = ml.features.FeatureMetadata.parse_features(metadata, examples, + keep_target=True) + elif mode == 'prediction': + features = ml.features.FeatureMetadata.parse_features(metadata, examples, + keep_target=False) + else: + print('ERROR: unknown mode') + sys.exit(1) + + # FeatureMetadata.parse_features splits categorical columns into two. + # undo that here. + new_features = {} + for name in schema_config.get('categorical_columns', []): + if name == schema_config['key_column'] or name == schema_config['target_column']: + continue + new_features[name] = features[name]['ids'] + + features.update(new_features) - return tf.parse_example(examples, example_schema) + return features