Skip to content
This repository has been archived by the owner on Sep 3, 2022. It is now read-only.

removed the feature type file #199

Merged
merged 8 commits into from
Feb 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@

from ._package import local_preprocess, cloud_preprocess, local_train, cloud_train, local_predict, \
cloud_predict, local_batch_predict, cloud_batch_predict

# Source of truth for the version of this package.
__version__ = '0.0.1'
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,20 @@
import json
import glob
import StringIO
import subprocess

import pandas as pd
import tensorflow as tf
import yaml

from tensorflow.python.lib.io import file_io

from . import preprocess
from . import trainer
from . import predict

_TF_GS_URL = 'gs://cloud-datalab/deploy/tf/tensorflow-0.12.0rc0-cp27-none-linux_x86_64.whl'

# TODO(brandondutra): move this url someplace else.
_SD_GS_URL = 'gs://cloud-ml-dev_bdt/structured_data-0.1.tar.gz'
#_SETUP_PY = '/datalab/packages_setup/structured_data/setup.py'
#_TF_VERSION = 'tensorflow-0.12.0rc0-cp27-none-linux_x86_64.whl'
#_TF_WHL = '/datalab/packages_setup/structured_data'


def _default_project():
Expand Down Expand Up @@ -80,24 +81,36 @@ def _assert_gcs_files(files):
raise ValueError('File %s is not a gcs path' % f)


def _run_cmd(cmd):
output = subprocess.Popen(cmd, shell=True, stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
def _package_to_staging(staging_package_url):
"""Repackage this package from local installed location and copy it to GCS.

Args:
staging_package_url: GCS path.
"""
import datalab.mlalpha as mlalpha

while True:
line = output.stdout.readline().rstrip()
print(line)
if line == '' and output.poll() != None:
break
# Find the package root. __file__ is under [package_root]/datalab_solutions/inception.
package_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), '../../'))
setup_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), 'setup.py'))
tar_gz_path = os.path.join(staging_package_url, 'staging', 'sd.tar.gz')

print('Building package in %s and uploading to %s' %
(package_root, tar_gz_path))
mlalpha.package_and_copy(package_root, setup_path, tar_gz_path)

def local_preprocess(output_dir, input_feature_file, input_file_pattern, schema_file):

return tar_gz_path


def local_preprocess(output_dir, input_file_pattern, schema_file):
"""Preprocess data locally with Pandas

Produce analysis used by training.

Args:
output_dir: The output directory to use.
input_feature_file: Describes defaults and column types.
input_file_pattern: String. File pattern what will expand into a list of csv
files.
schema_file: File path to the schema file.
Expand All @@ -106,22 +119,20 @@ def local_preprocess(output_dir, input_feature_file, input_file_pattern, schema_
args = ['local_preprocess',
'--input_file_pattern=%s' % input_file_pattern,
'--output_dir=%s' % output_dir,
'--schema_file=%s' % schema_file,
'--input_feature_file=%s' % input_feature_file]
'--schema_file=%s' % schema_file]

print('Starting local preprocessing.')
preprocess.local_preprocess.main(args)
print('Local preprocessing done.')

def cloud_preprocess(output_dir, input_feature_file, input_file_pattern=None, schema_file=None, bigquery_table=None, project_id=None):
def cloud_preprocess(output_dir, input_file_pattern=None, schema_file=None, bigquery_table=None, project_id=None):
"""Preprocess data in the cloud with BigQuery.

Produce analysis used by training. This can take a while, even for small
datasets. For small datasets, it may be faster to use local_preprocess.

Args:
output_dir: The output directory to use.
input_feature_file: Describes defaults and column types.
input_file_path: String. File pattern what will expand into a list of csv
files.
schema_file: File path to the schema file.
Expand All @@ -131,8 +142,7 @@ def cloud_preprocess(output_dir, input_feature_file, input_file_pattern=None, sc
_assert_gcs_files([output_dir, input_file_pattern])

args = ['cloud_preprocess',
'--output_dir=%s' % output_dir,
'--input_feature_file=%s' % input_feature_file]
'--output_dir=%s' % output_dir]

if input_file_pattern:
args.append('--input_file_pattern=%s' % input_file_pattern)
Expand All @@ -155,9 +165,10 @@ def local_train(train_file_pattern,
eval_file_pattern,
preprocess_output_dir,
output_dir,
transforms_file,
model_type,
max_steps,
transforms_file=None,
key_column=None,
top_n=None,
layer_sizes=None):
"""Train model locally.
Expand All @@ -166,9 +177,55 @@ def local_train(train_file_pattern,
eval_file_pattern: eval csv file
preprocess_output_dir: The output directory from preprocessing
output_dir: Output directory of training.
transforms_file: File path to the transforms file.
model_type: model type
max_steps: Int. Number of training steps to perform.
model_type: One of linear_classification, linear_regression,
dnn_classification, dnn_regression.
max_steps: Int. Number of training steps to perform.
transforms_file: File path to the transforms file. Example
{
"col_A": {"transform": "scale", "default": 0.0},
"col_B": {"transform": "scale","value": 4},
# Note col_C is missing, so default transform used.
"col_D": {"transform": "hash_one_hot", "hash_bucket_size": 4},
"col_target": {"transform": "target"},
"col_key": {"transform": "key"}
}
The keys correspond to the columns in the input files as defined by the
schema file during preprocessing. Some notes
1) The "key" transform is required, but the "target" transform is
optional, as the target column must be the first column in the input
data, and all other transfroms are optional.
2) Default values are optional. These are used if the input data has
missing values during training and prediction. If not supplied for a
column, the default value for a numerical column is that column's
mean vlaue, and for a categorical column the empty string is used.
3) For numerical colums, the following transforms are supported:
i) {"transform": "identity"}: does nothing to the number. (default)
ii) {"transform": "scale"}: scales the colum values to -1, 1.
iii) {"transform": "scale", "value": a}: scales the colum values
to -a, a.

For categorical colums, the transform supported depends on if the
model is a linear or DNN model because tf.layers is uesed.
For a linear model, the transforms supported are:
i) {"transform": "sparse"}: Makes a sparse vector using the full
vocabulary associated with the column (default).
ii) {"transform": "hash_sparse", "hash_bucket_size": n}: First each
string is hashed to an integer in the range [0, n), and then a
sparse vector is used.

For a DNN model, the categorical transforms that are supported are:
i) {"transform": "one_hot"}: A one-hot vector using the full
vocabulary is used. (default)
ii) {"transform": "embedding", "embedding_dim": d}: Each label is
embedded into an d-dimensional space.
iii) {"transform": "hash_one_hot", "hash_bucket_size": n}: The label
is first hashed into the range [0, n) and then a one-hot encoding
is made.
iv) {"transform": "hash_embedding", "hash_bucket_size": n,
"embedding_dim": d}: First each label is hashed to [0, n), and
then each integer is embedded into a d-dimensional space.
key_column: key column name. If None, this information is read from the
transforms_file.
top_n: Int. For classification problems, the output graph will contain the
labels and scores for the top n classes with a default of n=1. Use
None for regression problems.
Expand All @@ -179,7 +236,19 @@ def local_train(train_file_pattern,
nodes.
"""
#TODO(brandondutra): allow other flags to be set like batch size/learner rate
#TODO(brandondutra): doc someplace that TF>=0.12 and cloudml >-1.7 are needed.

if key_column and not transforms_file:
# Make a transforms file.
transforms_file = os.math.join(output_dir, 'transforms_file.json')
file_io.write_string_to_file(
transforms_file,
json.dumps({key_column: {"transform": "key"}}, indent=2))
elif not key_column and transforms_file:
pass
else:
raise ValueError('Exactly one of key_column or transforms_file should be '
'not None')


args = ['local_train',
'--train_data_paths=%s' % train_file_pattern,
Expand All @@ -189,8 +258,8 @@ def local_train(train_file_pattern,
'--transforms_file=%s' % transforms_file,
'--model_type=%s' % model_type,
'--max_steps=%s' % str(max_steps)]
if layer_sizes:
args.extend(['--layer_sizes'] + [str(x) for x in layer_sizes])
for i in range(len(layer_sizes)):
args.append('--layer_size%s=%s' % (i+1, str(layer_sizes[i])))
if top_n:
args.append('--top_n=%s' % str(top_n))

Expand All @@ -202,12 +271,12 @@ def cloud_train(train_file_pattern,
eval_file_pattern,
preprocess_output_dir,
output_dir,
transforms_file,
model_type,
max_steps,
transforms_file=None,
key_column=None,
top_n=None,
layer_sizes=None,
staging_bucket=None,
project_id=None,
job_name=None,
scale_tier='STANDARD_1',
Expand All @@ -219,9 +288,13 @@ def cloud_train(train_file_pattern,
eval_file_pattern: eval csv file
preprocess_output_dir: The output directory from preprocessing
output_dir: Output directory of training.
transforms_file: File path to the transforms file.
model_type: model type
model_type: One of linear_classification, linear_regression,
dnn_classification, dnn_regression.
max_steps: Int. Number of training steps to perform.
transforms_file: File path to the transforms file. See local_train for
a long description of this file. Must include the key transform.
key_column: key column name. If None, this information is read from the
transforms_file.
top_n: Int. For classification problems, the output graph will contain the
labels and scores for the top n classes with a default of n=1.
Use None for regression problems.
Expand All @@ -230,8 +303,6 @@ def cloud_train(train_file_pattern,
will create three DNN layers where the first layer will have 10 nodes,
the middle layer will have 3 nodes, and the laster layer will have 2
nodes.

staging_bucket: GCS bucket.
project_id: String. The GCE project to use. Defaults to the notebook's
default project id.
job_name: String. Job name as listed on the Dataflow service. If None, a
Expand All @@ -240,11 +311,22 @@ def cloud_train(train_file_pattern,
in this package. See https://cloud.google.com/ml/reference/rest/v1beta1/projects.jobs#ScaleTier
"""
#TODO(brandondutra): allow other flags to be set like batch size,
# learner rate, custom scale tiers, etc
#TODO(brandondutra): doc someplace that TF>=0.12 and cloudml >-1.7 are needed.
# learner rate, etc

if key_column and not transforms_file:
# Make a transforms file.
transforms_file = os.math.join(output_dir, 'transforms_file.json')
file_io.write_string_to_file(
transforms_file,
json.dumps({key_column: {"transform": "key"}}, indent=2))
elif not key_column and transforms_file:
pass
else:
raise ValueError('Exactly one of key_column or transforms_file should be '
'not None')

_assert_gcs_files([train_file_pattern, eval_file_pattern,
preprocess_output_dir, transforms_file])
preprocess_output_dir, transforms_file, output_dir])

# TODO: Convert args to a dictionary so we can use datalab's cloudml trainer.
args = ['--train_data_paths=%s' % train_file_pattern,
Expand All @@ -254,23 +336,21 @@ def cloud_train(train_file_pattern,
'--transforms_file=%s' % transforms_file,
'--model_type=%s' % model_type,
'--max_steps=%s' % str(max_steps)]
if layer_sizes:
args.extend(['--layer_sizes'] + [str(x) for x in layer_sizes])
for i in range(len(layer_sizes)):
args.append('--layer_size%s=%s' % (i+1, str(layer_sizes[i])))
if top_n:
args.append('--top_n=%s' % str(top_n))

# TODO(brandondutra): move these package uris locally, ask for a staging
# and copy them there. This package should work without cloudml having to
# maintain gs files!!!
job_request = {
'package_uris': [_TF_GS_URL, _SD_GS_URL],
'package_uris': [_package_to_staging(output_dir)],
'python_module': 'datalab_solutions.structured_data.trainer.task',
'scale_tier': scale_tier,
'region': region,
'args': args
}
# Local import because cloudml service does not have datalab
import datalab.mlaplha
import datalab
cloud_runner = datalab.mlalpha.CloudRunner(job_request)
if not job_name:
job_name = 'structured_data_train_' + datetime.datetime.now().strftime('%y%m%d_%H%M%S')
job = datalab.mlalpha.Job.submit_training(job_request, job_name)
Expand Down Expand Up @@ -331,7 +411,8 @@ def local_predict(model_dir, data):
print('Local prediction done.')

# Read the header file.
with open(os.path.join(tmp_dir, 'csv_header.txt'), 'r') as f:
header_file = os.path.join(tmp_dir, 'csv_header.txt')
with open(header_file, 'r') as f:
header = f.readline()

# Print any errors to the screen.
Expand Down Expand Up @@ -467,7 +548,9 @@ def cloud_batch_predict(model_dir, prediction_input_file, output_dir,
'--trained_model_dir=%s' % model_dir,
'--output_dir=%s' % output_dir,
'--output_format=%s' % output_format,
'--batch_size=%s' % str(batch_size)]
'--batch_size=%s' % str(batch_size),
'--extra_package=%s' % _package_to_staging(output_dir)]
print(cmd)

if shard_files:
cmd.append('--shard_files')
Expand Down
Loading