Skip to content
This repository has been archived by the owner on Sep 3, 2022. It is now read-only.

Inception package improvements #155

Merged
merged 2 commits into from
Feb 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datalab/mlalpha/_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def __init__(self, files, schema=None, schema_file=None):
self._schema = json.load(f)
self._files = []
for file in files:
self._files += ml.util._file.glob_files(file)
# glob_files() returns unicode strings which doesn't make DataFlow happy. So str().
self._files += [str(x) for x in ml.util._file.glob_files(file)]

@property
def files(self):
Expand Down
17 changes: 12 additions & 5 deletions solutionbox/inception/datalab_solutions/inception/_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@
import base64
import collections
import datetime
from googleapiclient import discovery
import google.cloud.ml as ml
import logging
import os


from . import _model
from . import _preprocess
from . import _trainer
from . import _util


_TF_GS_URL= 'gs://cloud-datalab/deploy/tf/tensorflow-0.12.0rc0-cp27-none-linux_x86_64.whl'
_TF_GS_URL= 'gs://cloud-datalab/deploy/tf/tensorflow-0.12.0rc1-cp27-none-linux_x86_64.whl'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why aren't we using an official TF release url here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. But I am not sure how long they will keep these releases there. It seems better to store dependencies in our own bucket, as much as possible?



class Cloud(object):
Expand All @@ -42,9 +42,10 @@ def __init__(self, checkpoint=None):
if self._checkpoint is None:
self._checkpoint = _util._DEFAULT_CHECKPOINT_GSURL

def preprocess(self, input_csvs, output_dir, pipeline_option=None):
def preprocess(self, dataset, output_dir, pipeline_option=None):
"""Cloud preprocessing with Cloud DataFlow."""

import datalab.mlalpha as mlalpha
job_name = 'preprocess-inception-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
options = {
'staging_location': os.path.join(output_dir, 'tmp', 'staging'),
Expand All @@ -59,9 +60,15 @@ def preprocess(self, input_csvs, output_dir, pipeline_option=None):
options.update(pipeline_option)

opts = beam.pipeline.PipelineOptions(flags=[], **options)
p = beam.Pipeline('DataflowPipelineRunner', options=opts)
_preprocess.configure_pipeline(p, self._checkpoint, input_csvs, output_dir, job_name)
p = beam.Pipeline('DataflowRunner', options=opts)
if type(dataset) is mlalpha.CsvDataSet:
_preprocess.configure_pipeline_csv(p, self._checkpoint, dataset.files, output_dir, job_name)
elif type(dataset) is mlalpha.BigQueryDataSet:
_preprocess.configure_pipeline_bigquery(p, self._checkpoint, dataset.sql, output_dir, job_name)
else:
raise ValueError('preprocess takes CsvDataSet or BigQueryDataset only.')
p.run()
return job_name

def train(self, input_dir, batch_size, max_steps, output_path,
region, scale_tier):
Expand Down
21 changes: 16 additions & 5 deletions solutionbox/inception/datalab_solutions/inception/_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,24 @@ def __init__(self, checkpoint=None):
if self._checkpoint is None:
self._checkpoint = _util._DEFAULT_CHECKPOINT_GSURL

def preprocess(self, input_csvs, output_dir):
def preprocess(self, dataset, output_dir):
"""Local preprocessing with local DataFlow."""


import datalab.mlalpha as mlalpha
job_id = 'inception_preprocessed_' + datetime.datetime.now().strftime('%y%m%d_%H%M%S')
p = beam.Pipeline('DirectPipelineRunner')
_preprocess.configure_pipeline(p, self._checkpoint, input_csvs, output_dir, job_id)
p.run()
# Project is needed for bigquery data source, even in local run.
options = {
'project': _util.default_project(),
}
opts = beam.pipeline.PipelineOptions(flags=[], **options)
p = beam.Pipeline('DirectRunner', options=opts)
if type(dataset) is mlalpha.CsvDataSet:
_preprocess.configure_pipeline_csv(p, self._checkpoint, dataset.files, output_dir, job_id)
elif type(dataset) is mlalpha.BigQueryDataSet:
_preprocess.configure_pipeline_bigquery(p, self._checkpoint, dataset.sql, output_dir, job_id)
else:
raise ValueError('preprocess takes CsvDataSet or BigQueryDataset only.')
p.run().wait_until_finish()

def train(self, input_dir, batch_size, max_steps, output_dir):
"""Local training."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def loss(logits, labels):
"""
labels = tf.to_int64(labels)
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(
logits, labels, name='xentropy')
logits=logits, labels=labels, name='xentropy')
return tf.reduce_mean(cross_entropy, name='xentropy_mean')


Expand Down
23 changes: 10 additions & 13 deletions solutionbox/inception/datalab_solutions/inception/_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@
from . import _util


def local_preprocess(input_csvs, output_dir, checkpoint=None):
def local_preprocess(dataset, output_dir, checkpoint=None):
"""Preprocess data locally. Produce output that can be used by training efficiently.
Args:
input_csvs: A list of CSV files which include two columns only: image_gs_url, label.
Preprocessing will concatenate the data inside all files and split them into
train/eval dataset. Can be local or GCS path.
dataset: data source to preprocess. Can be either datalab.mlalpha.CsvDataset, or
datalab.mlalpha.BigQueryDataSet.
output_dir: The output directory to use. Preprocessing will create a sub directory under
it for each run, and also update "latest" file which points to the latest preprocessed
directory. Users are responsible for cleanup. Can be local or GCS path.
Expand All @@ -51,17 +50,16 @@ def local_preprocess(input_csvs, output_dir, checkpoint=None):
print 'Local preprocessing...'
# TODO: Move this to a new process to avoid pickling issues
# TODO: Expose train/eval split ratio
_local.Local(checkpoint).preprocess(input_csvs, output_dir)
_local.Local(checkpoint).preprocess(dataset, output_dir)
print 'Done'


def cloud_preprocess(input_csvs, output_dir, checkpoint=None, pipeline_option=None):
def cloud_preprocess(dataset, output_dir, checkpoint=None, pipeline_option=None):
"""Preprocess data in Cloud with DataFlow.
Produce output that can be used by training efficiently.
Args:
input_csvs: A list of CSV files which include two columns only: image_gs_url, label.
Preprocessing will concatenate the data inside all files and split them into
train/eval dataset. GCS paths only.
dataset: data source to preprocess. Can be either datalab.mlalpha.CsvDataset, or
datalab.mlalpha.BigQueryDataSet. For CsvDataSet, all files need to be in GCS.
output_dir: The output directory to use. Preprocessing will create a sub directory under
it for each run, and also update "latest" file which points to the latest preprocessed
directory. Users are responsible for cleanup. GCS path only.
Expand All @@ -70,14 +68,13 @@ def cloud_preprocess(input_csvs, output_dir, checkpoint=None, pipeline_option=No

# TODO: Move this to a new process to avoid pickling issues
# TODO: Expose train/eval split ratio
# TODO: Consider exposing explicit train/eval datasets
_cloud.Cloud(checkpoint=checkpoint).preprocess(input_csvs, output_dir, pipeline_option)
job_name = _cloud.Cloud(checkpoint=checkpoint).preprocess(dataset, output_dir, pipeline_option)
if (_util.is_in_IPython()):
import IPython

dataflow_url = 'https://console.developers.google.com/dataflow?project=%s' % \
_util.default_project()
html = 'Job submitted.'
html = 'Job "%s" submitted.' % job_name
html += '<p>Click <a href="%s" target="_blank">here</a> to track preprocessing job. <br/>' \
% dataflow_url
IPython.display.display_html(html, raw=True)
Expand Down Expand Up @@ -126,7 +123,7 @@ def cloud_train(input_dir, batch_size, max_steps, output_dir,
}
log_url = 'https://console.developers.google.com/logs/viewer?' + \
urllib.urlencode(log_url_query_strings)
html = 'Job submitted.'
html = 'Job "%s" submitted.' % job_info['jobId']
html += '<p>Click <a href="%s" target="_blank">here</a> to view cloud log. <br/>' % log_url
IPython.display.display_html(html, raw=True)

Expand Down
81 changes: 48 additions & 33 deletions solutionbox/inception/datalab_solutions/inception/_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


import apache_beam as beam
from apache_beam.utils.options import PipelineOptions
from apache_beam.utils.pipeline_options import PipelineOptions
import cStringIO
import csv
import google.cloud.ml as ml
Expand All @@ -35,8 +35,7 @@
slim = tf.contrib.slim

error_count = beam.Aggregator('errorCount')
csv_rows_count = beam.Aggregator('csvRowsCount')
labels_count = beam.Aggregator('labelsCount')
rows_count = beam.Aggregator('RowsCount')
skipped_empty_line = beam.Aggregator('skippedEmptyLine')
embedding_good = beam.Aggregator('embedding_good')
embedding_bad = beam.Aggregator('embedding_bad')
Expand Down Expand Up @@ -66,27 +65,23 @@ def process(self, context, all_labels):
self.label_to_id_map[label] = i

# Row format is:
# image_uri(,label_ids)*
row = context.element
if not row:
# image_uri,label_id
element = context.element
if not element:
context.aggregate_to(skipped_empty_line, 1)
return

context.aggregate_to(csv_rows_count, 1)
uri = row[0]
context.aggregate_to(rows_count, 1)
uri = element['image_url']
if not uri or not uri.startswith('gs://'):
context.aggregate_to(invalid_uri, 1)
return

# In a real-world system, you may want to provide a default id for labels
# that were not in the dictionary. In this sample, we will throw an error.
# This code already supports multi-label problems if you want to use it.
label_ids = [self.label_to_id_map[label.strip()] for label in row[1:]]
context.aggregate_to(labels_count, len(label_ids))

if not label_ids:
try:
label_id = self.label_to_id_map[element['label'].strip()]
except KeyError:
context.aggregate_to(ignored_unlabeled_image, 1)
yield row[0], label_ids
yield uri, label_id


class ReadImageAndConvertToJpegDoFn(beam.DoFn):
Expand All @@ -97,7 +92,7 @@ class ReadImageAndConvertToJpegDoFn(beam.DoFn):
"""

def process(self, context):
uri, label_ids = context.element
uri, label_id = context.element

try:
with ml.util._file.open_local_or_gcs(uri, mode='r') as f:
Expand All @@ -114,7 +109,7 @@ def process(self, context):
output = cStringIO.StringIO()
img.save(output, 'jpeg')
image_bytes = output.getvalue()
yield uri, label_ids, image_bytes
yield uri, label_id, image_bytes


class EmbeddingsGraph(object):
Expand Down Expand Up @@ -250,7 +245,7 @@ def _bytes_feature(value):
def _float_feature(value):
return tf.train.Feature(float_list=tf.train.FloatList(value=value))

uri, label_ids, image_bytes = context.element
uri, label_id, image_bytes = context.element

try:
embedding = self.preprocess_graph.calculate_embedding(image_bytes)
Expand All @@ -265,13 +260,11 @@ def _float_feature(value):
context.aggregate_to(embedding_bad, 1)

example = tf.train.Example(features=tf.train.Features(feature={
'image_uri': _bytes_feature([uri]),
'image_uri': _bytes_feature([str(uri)]),
'embedding': _float_feature(embedding.ravel().tolist()),
}))

if label_ids:
label_ids.sort()
example.features.feature['label'].int64_list.value.extend(label_ids)
example.features.feature['label'].int64_list.value.append(label_id)

yield example

Expand All @@ -283,20 +276,31 @@ def partition_for(self, context, num_partitions):
return 1 if random.random() > 0.7 else 0


def configure_pipeline(p, checkpoint_path, input_paths, output_dir, job_id):
"""Specify PCollection and transformations in pipeline."""
output_latest_file = os.path.join(output_dir, 'latest')
def _get_sources_from_csvs(p, input_paths):
source_list = []
for ii, input_path in enumerate(input_paths):
input_source = beam.io.TextFileSource(input_path, strip_trailing_newlines=True)
source_list.append(p | 'Read input %d' % ii >> beam.Read(input_source))
all_sources = source_list | 'Flatten Sources' >> beam.Flatten()
labels = (all_sources
| 'Parse input for labels' >> beam.Map(lambda line: csv.reader([line]).next()[1])
source_list.append(p | 'Read from Csv %d' % ii >>
beam.io.ReadFromText(input_path, strip_trailing_newlines=True))
all_sources = (source_list | 'Flatten Sources' >> beam.Flatten()
| beam.Map(lambda line: csv.DictReader([line], fieldnames=['image_url', 'label']).next()))
return all_sources


def _get_sources_from_bigquery(p, query):
if len(query.split()) == 1:
bq_source = beam.io.BigQuerySource(table=query)
else:
bq_source = beam.io.BigQuerySource(query=query)
query_results = p | 'Read from BigQuery' >> beam.io.Read(bq_source)
return query_results


def _configure_pipeline_from_source(source, checkpoint_path, output_dir, job_id):
labels = (source
| 'Parse input for labels' >> beam.Map(lambda x: x['label'])
| 'Combine labels' >> beam.transforms.combiners.Count.PerElement()
| 'Get labels' >> beam.Map(lambda label_count: label_count[0]))
all_preprocessed = (all_sources
| 'Parse input' >> beam.Map(lambda line: csv.reader([line]).next())
all_preprocessed = (source
| 'Extract label ids' >> beam.ParDo(ExtractLabelIdsDoFn(),
beam.pvalue.AsIter(labels))
| 'Read and convert to JPEG' >> beam.ParDo(ReadImageAndConvertToJpegDoFn())
Expand All @@ -311,8 +315,19 @@ def configure_pipeline(p, checkpoint_path, input_paths, output_dir, job_id):
eval_save = train_eval[1] | 'Save eval to disk' >> SaveFeatures(preprocessed_eval)
train_save = train_eval[0] | 'Save train to disk' >> SaveFeatures(preprocessed_train)
# Make sure we write "latest" file after train and eval data are successfully written.
output_latest_file = os.path.join(output_dir, 'latest')
([eval_save, train_save, labels_save] | 'Wait for train eval saving' >> beam.Flatten() |
beam.transforms.combiners.Sample.FixedSizeGlobally('Fixed One', 1) |
beam.Map(lambda path: job_id) |
'WriteLatest' >> beam.io.textio.WriteToText(output_latest_file, shard_name_template=''))


def configure_pipeline_csv(p, checkpoint_path, input_paths, output_dir, job_id):
all_sources = _get_sources_from_csvs(p, input_paths)
_configure_pipeline_from_source(all_sources, checkpoint_path, output_dir, job_id)


def configure_pipeline_bigquery(p, checkpoint_path, query, output_dir, job_id):
all_sources = _get_sources_from_bigquery(p, query)
_configure_pipeline_from_source(all_sources, checkpoint_path, output_dir, job_id)

6 changes: 3 additions & 3 deletions solutionbox/inception/datalab_solutions/inception/_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ def evaluate(self, num_eval_batches=None):
with tf.Graph().as_default() as graph:
self.tensors = self.model.build_eval_graph(self.eval_data_paths,
self.batch_size)
self.summary = tf.merge_all_summaries()
self.summary = tf.summary.merge_all()
self.saver = tf.train.Saver()

self.summary_writer = tf.train.SummaryWriter(self.output_path)
self.summary_writer = tf.summary.FileWriter(self.output_path)
self.sv = tf.train.Supervisor(
graph=graph,
logdir=self.output_path,
Expand Down Expand Up @@ -163,7 +163,7 @@ def run_training(self):
self.saver = tf.train.Saver()

# Build the summary operation based on the TF collection of Summaries.
self.summary_op = tf.merge_all_summaries()
self.summary_op = tf.summary.merge_all()

# Create a "supervisor", which oversees the training process.
self.sv = tf.train.Supervisor(
Expand Down