Skip to content
This repository was archived by the owner on Sep 3, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ def parse_arguments(argv):
type=int,
help='Percent of input data for test dataset.')
parser.add_argument('--output_dir',
type=str,
type=str,
required=True,
help=('Google Cloud Storage or Local directory in which '
'to place outputs.'))
parser.add_argument('--transforms_config_file',
type=str,
parser.add_argument('--schema_file',
type=str,
required=True,
help=('File describing the schema and transforms of '
'each column in the csv data files.'))
help=('File describing the schema of each column in the '
'csv data files.'))
parser.add_argument('--job_name',
type=str,
help=('If using --cloud, the job name as listed in'
Expand All @@ -93,17 +93,47 @@ def parse_arguments(argv):

# args.job_name will not be used unless --cloud is used.
if not args.job_name:
args.job_name = ('structured-data-' +
args.job_name = ('structured-data-' +
datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

return args


def load_and_check_config(schema_file_path):
"""Checks the sschema file is well formatted."""

try:
json_str = ml.util._file.load_file(schema_file_path)
config = json.loads(json_str)
except:
print('ERROR reading schema file.')
sys.exit(1)

model_columns = (config.get('numerical_columns', [])
+ config.get('categorical_columns', []))
if config['target_column'] not in model_columns:
print('ERROR: target not listed as a numerical or categorial column.')
sys.exit(1)

if set(config['column_names']) != set(model_columns + [config['key_column']]):
print('ERROR: column_names do not match what was listed other fields')
sys.exit(1)

if set(config['numerical_columns']) & set(config['categorical_columns']):
print('ERROR: numerical_columns and categorical_columns must be disjoint.')
sys.exit(1)

if config['key_column'] in model_columns:
print('ERROR: kye_column should not be listed in numerical_columns or categorical_columns')
sys.exit(1)

return config


def preprocessing_features(args):

# Read the config file.
json_str = ml.util._file.load_file(args.transforms_config_file)
config = json.loads(json_str)
config = load_and_check_config(args.schema_file)

column_names = config['column_names']

Expand All @@ -114,48 +144,38 @@ def preprocessing_features(args):

# Extract target feature
target_name = config['target_column']
if config['problem_type'] == 'regression':
key_name = config['key_column']
if target_name in config.get('numerical_columns', []):
feature_set[target_name] = features.target(target_name).continuous()
else:
feature_set[target_name] = features.target(target_name).discrete()


# Extract numeric features
if 'numerical' in config:
for name, transform_config in config['numerical'].iteritems():
transform = transform_config['transform']
default = transform_config.get('default', None)
if transform == 'scale':
feature_set[name] = features.numeric(name, default=default).scale()
elif transform == 'max_abs_scale':
feature_set[name] = features.numeric(name, default=default).max_abs_scale(transform_config['value'])
elif transform == 'identity':
feature_set[name] = features.numeric(name, default=default).identity()
else:
print('Error: unkown numerical transform name %s in %s' % (transform, str(transform_config)))
sys.exit(1)
for name in config.get('numerical_columns', []):
if name == target_name or name == key_name:
continue
# apply identity to all numerical features.
default = config.get('defaults', {}).get(name, None)
feature_set[name] = features.numeric(name, default=default).identity()

# Extract categorical features
if 'categorical' in config:
for name, transform_config in config['categorical'].iteritems():
transform = transform_config['transform']
default = transform_config.get('default', None)
frequency_threshold = transform_config.get('frequency_threshold', 5)
if transform == 'one_hot' or transform == 'embedding':
feature_set[name] = features.categorical(
name,
default=default,
frequency_threshold=frequency_threshold)
else:
print('Error: unkown categorical transform name %s in %s' % (transform, str(transform_config)))
sys.exit(1)
for name in config.get('categorical_columns', []):
if name == target_name or name == key_name:
continue
# apply sparse transform to all categorical features.
default = config.get('defaults', {}).get(name, None)
feature_set[name] = features.categorical(
name,
default=default,
frequency_threshold=1).sparse(use_counts=True)

return feature_set, column_names



def preprocess(pipeline, feature_set, column_names, input_file_path,
train_percent, eval_percent, test_percent, output_dir):
schema_file, train_percent, eval_percent, test_percent,
output_dir):
"""Builds the preprocessing Dataflow pipeline.

The input files are split into a training, eval and test sets, and the SDK
Expand Down Expand Up @@ -206,11 +226,14 @@ def _partition_fn(row_unused, num_partitions_unused): # pylint: disable=unused-
>> io.SaveFeatures(
os.path.join(output_dir, 'features_test')))
# pylint: enable=expression-not-assigned
# Put a copy of the schema file in the output folder. Datalab will look for
# it there.
ml.util._file.copy_file(schema_file, os.path.join(output_dir, 'schema.json'))


def run_dataflow(feature_set, column_names, input_file_path, train_percent,
eval_percent, test_percent, output_dir, cloud, project_id,
job_name):
def run_dataflow(feature_set, column_names, input_file_path, schema_file,
train_percent, eval_percent, test_percent, output_dir, cloud,
project_id, job_name):
"""Run Preprocessing as a Dataflow pipeline."""

# Configure the pipeline.
Expand All @@ -233,6 +256,7 @@ def run_dataflow(feature_set, column_names, input_file_path, train_percent,
feature_set=feature_set,
column_names=column_names,
input_file_path=input_file_path,
schema_file=schema_file,
train_percent=train_percent,
eval_percent=eval_percent,
test_percent=test_percent,
Expand All @@ -251,6 +275,7 @@ def main(argv=None):
feature_set=feature_set,
column_names=column_names,
input_file_path=args.input_file_path,
schema_file=args.schema_file,
train_percent=args.train_percent,
eval_percent=args.eval_percent,
test_percent=args.test_percent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@
version=VERSION,
packages=['trainer', 'preprocess'],
author='Google',
author_email='cloudml-feedback@google.com',)
author_email='cloudml-feedback@google.com',
test_suite='nose.collector',
tests_require=['nose'])
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
# limitations under the License.
# ==============================================================================


import os
import random
import subprocess
import os


def make_csv_data(filename, num_rows, problem_type):
random.seed(12321)
Expand All @@ -29,9 +31,9 @@ def make_csv_data(filename, num_rows, problem_type):
str2 = random.choice(['abc', 'def', 'ghi', 'jkl', 'mno', 'pqr'])
str3 = random.choice(['car', 'truck', 'van', 'bike', 'train', 'drone'])

map1 = {'red':2, 'blue':6, 'green':4, 'pink':-5, 'yellow':-6, 'brown':-1, 'black':7}
map2 = {'abc':10, 'def':1, 'ghi':1, 'jkl':1, 'mno':1, 'pqr':1}
map3 = {'car':5, 'truck':10, 'van':15, 'bike':20, 'train':25, 'drone': 30}
map1 = {'red': 2, 'blue': 6, 'green': 4, 'pink': -5, 'yellow': -6, 'brown': -1, 'black': 7}
map2 = {'abc': 10, 'def': 1, 'ghi': 1, 'jkl': 1, 'mno': 1, 'pqr': 1}
map3 = {'car': 5, 'truck': 10, 'van': 15, 'bike': 20, 'train': 25, 'drone': 30}

# Build some model.
t = 0.5 + 0.5*num1 -2.5*num2 + num3
Expand All @@ -56,67 +58,83 @@ def make_csv_data(filename, num_rows, problem_type):
str3=str3)
f1.write(csv_line)

config = {'column_names': ['key', 'target', 'num1', 'num2', 'num3',
schema = {'column_names': ['key', 'target', 'num1', 'num2', 'num3',
'str1', 'str2', 'str3'],
'key_column': 'key',
'target_column': 'target',
'problem_type': problem_type,
'model_type': '',
'numerical': {'num1': {'transform': 'identity'},
'num2': {'transform': 'identity'},
'num3': {'transform': 'identity'}},
'categorical': {'str1': {'transform': 'one_hot'},
'str2': {'transform': 'one_hot'},
'str3': {'transform': 'one_hot'}}
'numerical_columns': ['num1', 'num2', 'num3'],
'categorical_columns': ['str1', 'str2', 'str3']
}
return config

if problem_type == 'classification':
schema['categorical_columns'] += ['target']
else:
schema['numerical_columns'] += ['target']

# use defaults for num3 and str3
transforms = {'num1': {'transform': 'identity'},
'num2': {'transform': 'identity'},
# 'num3': {'transform': 'identity'},
'str1': {'transform': 'one_hot'},
'str2': {'transform': 'one_hot'},
# 'str3': {'transform': 'one_hot'}
}
return schema, transforms


def run_preprocess(output_dir, csv_filename, config_filename,
def run_preprocess(output_dir, csv_filename, schema_filename,
train_percent='80', eval_percent='10', test_percent='10'):
cmd = ['python', './preprocess/preprocess.py',
preprocess_script = os.path.abspath(
os.path.join(os.path.dirname(__file__), '../preprocess/preprocess.py'))
cmd = ['python', preprocess_script,
'--output_dir', output_dir,
'--input_file_path', csv_filename,
'--transforms_config_file', config_filename,
'--input_file_path', csv_filename,
'--schema_file', schema_filename,
'--train_percent', train_percent,
'--eval_percent', eval_percent,
'--test_percent', test_percent,
]
print('Current working directoyr: %s' % os.getcwd())
print('Going to run command: %s' % ' '.join(cmd))
subprocess.check_call(cmd, stderr=open(os.devnull, 'wb'))

def run_training(output_dir, input_dir, config_filename, extra_args=[]):
"""Runs Training via gcloud alpha ml local train.

def run_training(output_dir, input_dir, schema_filename, transforms_filename,
max_steps, extra_args=[]):
"""Runs Training via gcloud beta ml local train.

Args:
output_dir: the trainer's output folder
input_folder: should contain features_train*, features_eval*, and
input_dir: should contain features_train*, features_eval*, and
mmetadata.json.
config_filename: path to the config file
schema_filename: path to the schema file
transforms_filename: path to the transforms file.
max_steps: int. max training steps.
extra_args: array of strings, passed to the trainer.

Returns:
The stderr of training as one string. TF writes to stderr, so basically, the
output of training.
"""
train_filename = os.path.join(input_dir, 'features_train*')
eval_filename = os.path.join(input_dir, 'features_eval*')
metadata_filename = os.path.join(input_dir, 'metadata.json')
cmd = ['gcloud alpha ml local train',

# Gcloud has the fun bug that you have to be in the parent folder of task.py
# when you call it. So cd there first.
task_parent_folder = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..'))
cmd = ['cd %s &&' % task_parent_folder,
'gcloud beta ml local train',
'--module-name=trainer.task',
'--package-path=trainer',
'--',
'--train_data_paths=%s' % train_filename,
'--eval_data_paths=%s' % eval_filename,
'--metadata_path=%s' % metadata_filename,
'--output_path=%s' % output_dir,
'--transforms_config_file=%s' % config_filename,
'--max_steps=2500'] + extra_args
print('Current working directoyr: %s' % os.getcwd())
'--schema_file=%s' % schema_filename,
'--transforms_file=%s' % transforms_filename,
'--max_steps=%s' % max_steps] + extra_args
print('Going to run command: %s' % ' '.join(cmd))
sp = subprocess.Popen(' '.join(cmd), shell=True, stderr=subprocess.PIPE) #open(os.devnull, 'wb'))
sp = subprocess.Popen(' '.join(cmd), shell=True, stderr=subprocess.PIPE)
_, err = sp.communicate()
err = err.splitlines()
print 'last line'
print err[len(err)-1]

stderr=subprocess.PIPE
return err
Loading