Skip to content
This repository has been archived by the owner on Sep 3, 2022. It is now read-only.

Commit

Permalink
Inception Package Improvements (#138)
Browse files Browse the repository at this point in the history
* Fix an issue that prediction right after preprocessing fails in inception package local run.

* Remove the "labels_file" parameter from inception preprocess/train/predict. Instead it will get labels from training data. Prediction graph will return labels.
Make online prediction works with GCS images.
"%%ml alpha deploy" now also check for "/model" subdir if needed.
Other minor improvements.

* Make local batch prediction really batched.
Batch prediction input may not have to include target column.
Sort labels, so it is consistent between preprocessing and training.
Follow up other core review comments.

* Follow up code review comments.
  • Loading branch information
qimingj authored Jan 30, 2017
1 parent e309785 commit e92b790
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 133 deletions.
10 changes: 8 additions & 2 deletions datalab/mlalpha/_cloud_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,16 @@ def get(self, version_name):
def _wait_for_long_running_operation(self, response):
if 'name' not in response:
raise Exception('Invaid response from service. Cannot find "name" field.')
print('Waiting for job "%s"' % response['name'])
while True:
response = self._api.projects().operations().get(name=response['name']).execute()
if 'done' not in response or response['done'] != True:
time.sleep(3)
else:
if 'error' in response:
print response['error']
print(response['error'])
else:
print('Done.')
break

def deploy(self, version_name, path):
Expand All @@ -173,7 +176,10 @@ def deploy(self, version_name, path):
if not path.startswith('gs://'):
raise Exception('Invalid path. Only Google Cloud Storage path (gs://...) is accepted.')
if not datalab.storage.Item.from_url(os.path.join(path, 'export.meta')).exists():
raise Exception('Cannot find export.meta from given path.')
# try appending '/model' sub dir.
path = os.path.join(path, 'model')
if not datalab.storage.Item.from_url(os.path.join(path, 'export.meta')).exists():
raise Exception('Cannot find export.meta from given path.')

body = {'name': self._model_name}
parent = 'projects/' + self._project_id
Expand Down
20 changes: 10 additions & 10 deletions solutionbox/inception/datalab_solutions/inception/_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, checkpoint=None):
if self._checkpoint is None:
self._checkpoint = _util._DEFAULT_CHECKPOINT_GSURL

def preprocess(self, input_csvs, labels_file, output_dir, pipeline_option=None):
def preprocess(self, input_csvs, output_dir, pipeline_option=None):
"""Cloud preprocessing with Cloud DataFlow."""

job_name = 'preprocess-inception-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
Expand All @@ -60,22 +60,19 @@ def preprocess(self, input_csvs, labels_file, output_dir, pipeline_option=None):

opts = beam.pipeline.PipelineOptions(flags=[], **options)
p = beam.Pipeline('DataflowPipelineRunner', options=opts)
_preprocess.configure_pipeline(
p, self._checkpoint, input_csvs, labels_file, output_dir, job_name)
_preprocess.configure_pipeline(p, self._checkpoint, input_csvs, output_dir, job_name)
p.run()

def train(self, labels_file, input_dir, batch_size, max_steps, output_path,
def train(self, input_dir, batch_size, max_steps, output_path,
region, scale_tier):
"""Cloud training with CloudML trainer service."""

import datalab.mlalpha as mlalpha
num_classes = len(_util.get_labels(labels_file))
job_args = {
'input_dir': input_dir,
'output_path': output_path,
'max_steps': max_steps,
'batch_size': batch_size,
'num_classes': num_classes,
'checkpoint': self._checkpoint
}
job_request = {
Expand All @@ -89,16 +86,14 @@ def train(self, labels_file, input_dir, batch_size, max_steps, output_path,
job_id = 'inception_train_' + datetime.datetime.now().strftime('%y%m%d_%H%M%S')
return cloud_runner.run(job_id)

def predict(self, model_id, image_files, labels_file):
def predict(self, model_id, image_files):
"""Cloud prediction with CloudML prediction service."""

import datalab.mlalpha as mlalpha
parts = model_id.split('.')
if len(parts) != 2:
raise Exception('Invalid model name for cloud prediction. Use "model.version".')

labels = _util.get_labels(labels_file)
labels.append('UNKNOWN')
data = []
for ii, img_file in enumerate(image_files):
with ml.util._file.open_local_or_gcs(img_file, 'rb') as f:
Expand All @@ -110,6 +105,11 @@ def predict(self, model_id, image_files, labels_file):

cloud_predictor = mlalpha.CloudPredictor(parts[0], parts[1])
predictions = cloud_predictor.predict(data)
labels_and_scores = [(labels[x['prediction']], x['scores'][x['prediction']])
if len(predictions) == 0:
raise Exception('Prediction results are empty.')
# Although prediction results contains a labels list in each instance, they are all the same
# so taking the first one.
labels = predictions[0]['labels']
labels_and_scores = [(x['prediction'], x['scores'][labels.index(x['prediction'])])
for x in predictions]
return labels_and_scores
19 changes: 9 additions & 10 deletions solutionbox/inception/datalab_solutions/inception/_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,32 +42,31 @@ def __init__(self, checkpoint=None):
if self._checkpoint is None:
self._checkpoint = _util._DEFAULT_CHECKPOINT_GSURL

def preprocess(self, input_csvs, labels_file, output_dir):
def preprocess(self, input_csvs, output_dir):
"""Local preprocessing with local DataFlow."""

job_id = 'inception_preprocessed_' + datetime.datetime.now().strftime('%y%m%d_%H%M%S')
p = beam.Pipeline('DirectPipelineRunner')
_preprocess.configure_pipeline(p, self._checkpoint, input_csvs, labels_file,
output_dir, job_id)
_preprocess.configure_pipeline(p, self._checkpoint, input_csvs, output_dir, job_id)
p.run()

def train(self, labels_file, input_dir, batch_size, max_steps, output_dir):
def train(self, input_dir, batch_size, max_steps, output_dir):
"""Local training."""

num_classes = len(_util.get_labels(labels_file))
model = _model.Model(num_classes, 0.5, self._checkpoint)
labels = _util.get_labels(input_dir)
model = _model.Model(labels, 0.5, self._checkpoint)
task_data = {'type': 'master', 'index': 0}
task = type('TaskSpec', (object,), task_data)
_trainer.Trainer(input_dir, batch_size, max_steps, output_dir,
model, None, task).run_training()

def predict(self, model_dir, image_files, labels_file):
def predict(self, model_dir, image_files):
"""Local prediction."""

return _predictor.predict(model_dir, image_files, labels_file)
return _predictor.predict(model_dir, image_files)


def batch_predict(self, model_dir, input_csv, labels_file, output_file, output_bq_table):
def batch_predict(self, model_dir, input_csv, output_file, output_bq_table):
"""Local batch prediction."""

return _predictor.batch_predict(model_dir, input_csv, labels_file, output_file, output_bq_table)
return _predictor.batch_predict(model_dir, input_csv, output_file, output_bq_table)
32 changes: 24 additions & 8 deletions solutionbox/inception/datalab_solutions/inception/_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ def __init__(self):
class Model(object):
"""TensorFlow model for the flowers problem."""

def __init__(self, label_count, dropout, inception_checkpoint_file):
self.label_count = label_count
def __init__(self, labels, dropout, inception_checkpoint_file):
self.labels = labels
self.labels.sort()
self.dropout = dropout
self.inception_checkpoint_file = inception_checkpoint_file

Expand Down Expand Up @@ -203,7 +204,7 @@ def build_graph(self, data_paths, batch_size, graph_mod):
'label':
tf.FixedLenFeature(
shape=[1], dtype=tf.int64,
default_value=[self.label_count]),
default_value=[len(self.labels)]),
'embedding':
tf.FixedLenFeature(
shape=[BOTTLENECK_TENSOR_SIZE], dtype=tf.float32)
Expand All @@ -215,7 +216,7 @@ def build_graph(self, data_paths, batch_size, graph_mod):

# We assume a default label, so the total number of labels is equal to
# label_count+1.
all_labels_count = self.label_count + 1
all_labels_count = len(self.labels) + 1
with tf.name_scope('final_ops'):
softmax, logits = self.add_final_training_ops(
embeddings,
Expand Down Expand Up @@ -316,12 +317,28 @@ def build_prediction_graph(self):

# To extract the id, we need to add the identity function.
keys = tf.identity(keys_placeholder)
labels = self.labels + ['UNKNOWN']
predicted_label = tf.contrib.lookup.index_to_string(tensors.predictions[0],
mapping=labels)
# Need to duplicate the labels by num_of_instances so the output is one batch
# (all output members share the same outer dimension).
# The labels are needed for client to match class scores list.
labels_tensor = tf.expand_dims(tf.constant(labels), 0)
num_instance = tf.shape(keys)
labels_tensors_n = tf.tile(labels_tensor, tf.concat(0, [num_instance, [1]]))

outputs = {
'key': keys.name,
'prediction': tensors.predictions[0].name,
'scores': tensors.predictions[1].name
'prediction': predicted_label.name,
'labels': labels_tensors_n.name,
'scores': tensors.predictions[1].name,
}
tf.add_to_collection('outputs', json.dumps(outputs))
# Add table init op to collection so online prediction will load the model and run it.
# TODO: initialize_all_tables is going to be deprecated but the replacement
# tf.tables_initializer does not exist in 0.12 yet.
init_tables_op = tf.initialize_all_tables()
tf.add_to_collection(tf.contrib.session_bundle.constants.INIT_OP_KEY, init_tables_op)

def export(self, last_checkpoint, output_dir):
"""Builds a prediction graph and xports the model.
Expand All @@ -340,8 +357,7 @@ def export(self, last_checkpoint, output_dir):
last_checkpoint)
saver = tf.train.Saver()
saver.export_meta_graph(filename=os.path.join(output_dir, 'export.meta'))
saver.save(
sess, os.path.join(output_dir, 'export'), write_meta_graph=False)
saver.save(sess, os.path.join(output_dir, 'export'), write_meta_graph=False)

def format_metric_values(self, metric_values):
"""Formats metric values - used for logging purpose."""
Expand Down
57 changes: 23 additions & 34 deletions solutionbox/inception/datalab_solutions/inception/_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Datalab will look for functions with the above names.
"""

import google.cloud.ml as ml
import logging
import os
import urllib
Expand All @@ -35,14 +36,12 @@
from . import _util


def local_preprocess(input_csvs, labels_file, output_dir, checkpoint=None):
def local_preprocess(input_csvs, output_dir, checkpoint=None):
"""Preprocess data locally. Produce output that can be used by training efficiently.
Args:
input_csvs: A list of CSV files which include two columns only: image_gs_url, label.
Preprocessing will concatenate the data inside all files and split them into
train/eval dataset. Can be local or GCS path.
labels_file: The path to the labels file which lists all labels, each in a separate line.
It can be a local or a GCS path.
output_dir: The output directory to use. Preprocessing will create a sub directory under
it for each run, and also update "latest" file which points to the latest preprocessed
directory. Users are responsible for cleanup. Can be local or GCS path.
Expand All @@ -52,19 +51,17 @@ def local_preprocess(input_csvs, labels_file, output_dir, checkpoint=None):
print 'Local preprocessing...'
# TODO: Move this to a new process to avoid pickling issues
# TODO: Expose train/eval split ratio
_local.Local(checkpoint).preprocess(input_csvs, labels_file, output_dir)
_local.Local(checkpoint).preprocess(input_csvs, output_dir)
print 'Done'


def cloud_preprocess(input_csvs, labels_file, output_dir, checkpoint=None,
pipeline_option=None):
def cloud_preprocess(input_csvs, output_dir, checkpoint=None, pipeline_option=None):
"""Preprocess data in Cloud with DataFlow.
Produce output that can be used by training efficiently.
Args:
input_csvs: A list of CSV files which include two columns only: image_gs_url, label.
Preprocessing will concatenate the data inside all files and split them into
train/eval dataset. GCS paths only.
labels_file: The GCS path to the labels file which lists all labels, each in a separate line.
output_dir: The output directory to use. Preprocessing will create a sub directory under
it for each run, and also update "latest" file which points to the latest preprocessed
directory. Users are responsible for cleanup. GCS path only.
Expand All @@ -74,8 +71,7 @@ def cloud_preprocess(input_csvs, labels_file, output_dir, checkpoint=None,
# TODO: Move this to a new process to avoid pickling issues
# TODO: Expose train/eval split ratio
# TODO: Consider exposing explicit train/eval datasets
_cloud.Cloud(checkpoint=checkpoint).preprocess(input_csvs, labels_file, output_dir,
pipeline_option)
_cloud.Cloud(checkpoint=checkpoint).preprocess(input_csvs, output_dir, pipeline_option)
if (_util.is_in_IPython()):
import IPython

Expand All @@ -87,11 +83,9 @@ def cloud_preprocess(input_csvs, labels_file, output_dir, checkpoint=None,
IPython.display.display_html(html, raw=True)


def local_train(labels_file, input_dir, batch_size, max_steps, output_dir, checkpoint=None):
def local_train(input_dir, batch_size, max_steps, output_dir, checkpoint=None):
"""Train model locally. The output can be used for local prediction or for online deployment.
Args:
labels_file: The path to the labels file which lists all labels, each in a separate line.
It can be a local or a GCS path.
input_dir: A directory path containing preprocessed results. Can be local or GCS path.
batch_size: size of batch used for training.
max_steps: number of steps to train.
Expand All @@ -104,27 +98,25 @@ def local_train(labels_file, input_dir, batch_size, max_steps, output_dir, check
logger.setLevel(logging.INFO)
print 'Local training...'
try:
_local.Local(checkpoint).train(labels_file, input_dir, batch_size, max_steps, output_dir)
_local.Local(checkpoint).train(input_dir, batch_size, max_steps, output_dir)
finally:
logger.setLevel(original_level)
print 'Done'


def cloud_train(labels_file, input_dir, batch_size, max_steps, output_dir,
def cloud_train(input_dir, batch_size, max_steps, output_dir,
region, scale_tier='BASIC', checkpoint=None):
"""Train model in the cloud with CloudML trainer service.
The output can be used for local prediction or for online deployment.
Args:
labels_file: The path to the labels file which lists all labels, each in a separate line.
GCS path only.
input_dir: A directory path containing preprocessed results. GCS path only.
batch_size: size of batch used for training.
max_steps: number of steps to train.
output_dir: The output directory to use. GCS path only.
checkpoint: the Inception checkpoint to use.
"""

job_info = _cloud.Cloud(checkpoint=checkpoint).train(labels_file, input_dir, batch_size,
job_info = _cloud.Cloud(checkpoint=checkpoint).train(input_dir, batch_size,
max_steps, output_dir, region, scale_tier)
if (_util.is_in_IPython()):
import IPython
Expand All @@ -146,58 +138,55 @@ def _display_predict_results(results, show_image):
if show_image is True:
IPython.display.display_html('<p style="font-size:28px">%s(%.5f)</p>' % label_and_score,
raw=True)
IPython.display.display(IPython.display.Image(filename=image_file))
with ml.util._file.open_local_or_gcs(image_file, mode='r') as f:
IPython.display.display(IPython.display.Image(data=f.read()))
else:
IPython.display.display_html(
'<p>%s&nbsp&nbsp%s(%.5f)</p>' % ((image_file,) + label_and_score), raw=True)
else:
print results


def local_predict(model_dir, image_files, labels_file, show_image=True):
def local_predict(model_dir, image_files, show_image=True):
"""Predict using an offline model.
Args:
model_dir: The directory of a trained inception model. Can be local or GCS paths.
image_files: The paths to the image files to predict labels. Can be local or GCS paths.
labels_file: The path to the labels file which lists all labels, each in a separate line.
Can be local or GCS paths.
show_image: Whether to show images in the results.
"""

labels_and_scores = _local.Local().predict(model_dir, image_files, labels_file)
print('Predicting...')
labels_and_scores = _local.Local().predict(model_dir, image_files)
results = zip(image_files, labels_and_scores)
_display_predict_results(results, show_image)
print('Done')


def cloud_predict(model_id, image_files, labels_file, show_image=True):
def cloud_predict(model_id, image_files, show_image=True):
"""Predict using a deployed (online) model.
Args:
model_id: The deployed model id in the form of "model.version".
image_files: The paths to the image files to predict labels. GCS paths only.
labels_file: The path to the labels file which lists all labels, each in a separate line.
GCS paths only.
show_image: Whether to show images in the results.
"""

labels_and_scores = _cloud.Cloud().predict(model_id, image_files, labels_file)
print('Predicting...')
labels_and_scores = _cloud.Cloud().predict(model_id, image_files)
results = zip(image_files, labels_and_scores)
_display_predict_results(results, show_image)


def local_batch_predict(model_dir, input_csv, labels_file, output_file, output_bq_table=None):
def local_batch_predict(model_dir, input_csv, output_file, output_bq_table=None):
"""Batch predict using an offline model.
Args:
model_dir: The directory of a trained inception model. Can be local or GCS paths.
input_csv: The input csv which include two columns only: image_gs_url, label.
Can be local or GCS paths.
labels_file: The path to the labels file which lists all labels, each in a separate line.
Can be local or GCS paths.
output_file: The output csv file containing prediction results.
output_bq_table: If provided, will also save the results to BigQuery table.
"""
_local.Local().batch_predict(model_dir, input_csv, labels_file, output_file, output_bq_table)

print('Predicting...')
_local.Local().batch_predict(model_dir, input_csv, output_file, output_bq_table)
print('Done')

def cloud_batch_predict(model_dir, image_files, labels_file, show_image=True, output_file=None):
def cloud_batch_predict(model_dir, image_files, show_image=True, output_file=None):
"""Not Implemented Yet"""
pass
Loading

0 comments on commit e92b790

Please sign in to comment.