diff --git a/datalab/mlalpha/__init__.py b/datalab/mlalpha/__init__.py index 1621129bc..00eae7478 100644 --- a/datalab/mlalpha/__init__.py +++ b/datalab/mlalpha/__init__.py @@ -28,6 +28,7 @@ from ._confusion_matrix import ConfusionMatrix from ._analysis import CsvEvalResults, CsvEvalSource, EvalResultsCsvCoder, \ AccuracyFn, FeatureSlicingPipeline +from ._package_runner import PackageRunner from plotly.offline import init_notebook_mode diff --git a/datalab/mlalpha/_package_runner.py b/datalab/mlalpha/_package_runner.py new file mode 100644 index 000000000..98de1f3b0 --- /dev/null +++ b/datalab/mlalpha/_package_runner.py @@ -0,0 +1,87 @@ +# Copyright 2017 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. + +"""Implements running Datalab ML Solution Packages.""" + +import inspect +import google.cloud.ml as ml +import os +import shutil +import subprocess +import sys +import tempfile + + +PACKAGE_NAMESPACE = 'datalab_solutions' + +class PackageRunner(object): + """A Helper class to run Datalab ML solution packages.""" + + def __init__(self, package_uri): + """ + Args: + package_uri: The uri of the package. The file base name needs to be in the form of + "name-version", such as "inception-0.1". The first part split by "-" will be used + as the last part of the namespace. In the example above, + "datalab_solutions.inception" will be the namespace. + """ + self._package_uri = package_uri + self._name = os.path.basename(package_uri).split('-')[0] + self._install_dir = None + + def _install_to_temp(self): + install_dir = tempfile.mkdtemp() + tar_path = self._package_uri + if tar_path.startswith('gs://'): + tar_path = os.path.join(install_dir, os.path.basename(tar_path)) + ml.util._file.copy_file(self._package_uri, tar_path) + subprocess.check_call(['pip', 'install', tar_path, '--target', install_dir, + '--upgrade', '--force-reinstall']) + sys.path.insert(0, install_dir) + self._install_dir = install_dir + + def __enter__(self): + self._install_to_temp() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._cleanup_installation() + + def _cleanup_installation(self): + if self._install_dir is None: + return + if sys.path[0] == self._install_dir: + del sys.path[0] + shutil.rmtree(self._install_dir) + + def get_func_args_and_docstring(self, func_name): + """Get function args and docstrings. + Args: + func_name: name of the function. + Returns: + A tuple of function argspec, function docstring. + """ + func = getattr(__import__(PACKAGE_NAMESPACE + '.' + self._name, fromlist=[func_name]), + func_name) + return inspect.getargspec(func), func.__doc__ + + def run_func(self, func_name, args): + """Run a function. + Args: + func_name: name of the function. + args: args supplied to the functions. + Returns: + function return values. + """ + func = getattr(__import__(PACKAGE_NAMESPACE + '.' + self._name, fromlist=[func_name]), + func_name) + return func(**args) diff --git a/datalab/mlalpha/commands/__init__.py b/datalab/mlalpha/commands/__init__.py index 3a5c1044b..765f20a3e 100644 --- a/datalab/mlalpha/commands/__init__.py +++ b/datalab/mlalpha/commands/__init__.py @@ -13,5 +13,6 @@ from __future__ import absolute_import +from . import _ml from . import _mlalpha from . import _tensorboard diff --git a/datalab/mlalpha/commands/_ml.py b/datalab/mlalpha/commands/_ml.py new file mode 100644 index 000000000..4ce46f5bd --- /dev/null +++ b/datalab/mlalpha/commands/_ml.py @@ -0,0 +1,160 @@ +# Copyright 2017 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. + +try: + import IPython + import IPython.core.magic +except ImportError: + raise Exception('This module can only be loaded in ipython.') + +import collections +import os +import yaml + +import datalab.context +import datalab.mlalpha +import datalab.utils.commands + + +@IPython.core.magic.register_line_cell_magic +def ml(line, cell=None): + """Implements the ml line cell magic. + + Args: + line: the contents of the ml line. + cell: the contents of the ml cell. + + Returns: + The results of executing the cell. + """ + parser = datalab.utils.commands.CommandParser(prog="ml", description=""" +Execute various ml-related operations. Use "%%ml -h" for help on a specific command. +""") + preprocess_parser = parser.subcommand('preprocess', 'Run a preprocess job.') + preprocess_parser.add_argument('--usage', + help='Show usage from the specified preprocess package.', + action='store_true', default=False) + preprocess_parser.add_argument('--cloud', + help='Whether to run the preprocessing job in the cloud.', + action='store_true', default=False) + preprocess_parser.add_argument('--package', + help='The preprocess package to use. Can be a gs or local path.', + required=True) + preprocess_parser.set_defaults(func=_preprocess) + + train_parser = parser.subcommand('train', 'Train an ML model.') + train_parser.add_argument('--usage', + help='Show usage from the specified trainer package', + action='store_true', default=False) + train_parser.add_argument('--cloud', + help='Whether to run the training job in the cloud.', + action='store_true', default=False) + train_parser.add_argument('--package', + help='The trainer package to use. Can be a gs or local path.', + required=True) + train_parser.set_defaults(func=_train) + + predict_parser = parser.subcommand('predict', 'Predict with an ML model.') + predict_parser.add_argument('--usage', + help='Show usage from the specified prediction package', + action='store_true', default=False) + predict_parser.add_argument('--cloud', + help='Whether to run prediction in the cloud.', + action='store_true', default=False) + predict_parser.add_argument('--package', + help='The prediction package to use. Can be a gs or local path.', + required=True) + predict_parser.set_defaults(func=_predict) + + batch_predict_parser = parser.subcommand('batch_predict', 'Batch predict with an ML model.') + batch_predict_parser.add_argument('--usage', + help='Show usage from the specified prediction package', + action='store_true', default=False) + batch_predict_parser.add_argument('--cloud', + help='Whether to run prediction in the cloud.', + action='store_true', default=False) + batch_predict_parser.add_argument('--package', + help='The prediction package to use. Can be a gs or local path.', + required=True) + batch_predict_parser.set_defaults(func=_batch_predict) + + namespace = datalab.utils.commands.notebook_environment() + return datalab.utils.commands.handle_magic_line(line, cell, parser, namespace=namespace) + + +def _command_template(pr, func_name): + """Return (args_list, docstring). + args_list is in the form of: + arg1: + arg2: + arg3: (optional) + """ + argspec, docstring = pr.get_func_args_and_docstring(func_name) + num_defaults = len(argspec.defaults) if argspec.defaults is not None else 0 + # Need to fill in a keyword (here '(NOT_OP)') for non optional args. + # Later we will replace '(NOT_OP)' with empty string. + optionals = ['(NOT_OP)'] * (len(argspec.args) - num_defaults) + \ + ['(optional)'] * num_defaults + args = dict(zip(argspec.args, optionals)) + args_dump = yaml.safe_dump(args, default_flow_style=False).replace('(NOT_OP)', '') + return args_dump, docstring + + +def _run_package(args, cell, mode): + local_func_name = 'local_' + mode + cloud_func_name = 'cloud_' + mode + with datalab.mlalpha.PackageRunner(args['package']) as pr: + if args['usage'] is True: + #TODO Consider calling _command_template once to save one pip installation + command_local = """%%ml %s --package %s""" % (mode, args['package']) + args_local, docstring_local = _command_template(pr, local_func_name) + command_cloud = """%%ml %s --package %s --cloud""" % (mode, args['package']) + args_cloud, docstring_cloud = _command_template(pr, cloud_func_name) + output = """ +Local Run Command: + +%s +%s +[Description]: +%s + +Cloud Run Command: + +%s +%s +[Description]: +%s +""" % (command_local, args_local, docstring_local, command_cloud, args_cloud, docstring_cloud) + return datalab.utils.commands.render_text(output, preformatted=True) + + env = datalab.utils.commands.notebook_environment() + func_args = datalab.utils.commands.parse_config(cell, env) + if args['cloud'] is True: + return pr.run_func(cloud_func_name, func_args) + else: + return pr.run_func(local_func_name, func_args) + + +def _preprocess(args, cell): + return _run_package(args, cell, 'preprocess') + + +def _train(args, cell): + return _run_package(args, cell, 'train') + + +def _predict(args, cell): + return _run_package(args, cell, 'predict') + + +def _batch_predict(args, cell): + return _run_package(args, cell, 'batch_predict') diff --git a/datalab/mlalpha/commands/_mlalpha.py b/datalab/mlalpha/commands/_mlalpha.py index ee9256088..6500aeb60 100644 --- a/datalab/mlalpha/commands/_mlalpha.py +++ b/datalab/mlalpha/commands/_mlalpha.py @@ -29,6 +29,7 @@ import urllib import yaml +import datalab.bigquery as bq import datalab.context import datalab.data import datalab.mlalpha @@ -159,16 +160,19 @@ def mlalpha(line, cell=None): package_parser.add_argument('--output', help='the output dir of the package.', required=True) package_parser.set_defaults(func=_package) - package_parser = parser.subcommand('feature-slice-view','View results of a ' + + feature_slice_parser = parser.subcommand('feature-slice-view','View results of a ' + 'FeatureSlicingPipeline, some eval metrics grouped by ' + 'specified feature column values') - package_parser.add_argument('--file', help='The results file from FeatureSlicingPipeline', - required=True) - package_parser.add_argument('--feature', - help='Which feature to view. The feature must be specified ' + - 'in the FeatureSlicingPipeline. If not specified, all ' + - 'features will be listed.') - package_parser.set_defaults(func=_feature_slice_view) + feature_slice_parser.add_argument('--file', help='The results file from FeatureSlicingPipeline') + feature_slice_parser.add_argument('--sql', + help='The sql module which should return "feature",' + + '"count" columns, plus at least one metric column ' + + 'with any names') + feature_slice_parser.add_argument('--feature', + help='Which feature to view. The feature must be specified ' + + 'in the FeatureSlicingPipeline. If not specified, all ' + + 'features will be listed.') + feature_slice_parser.set_defaults(func=_feature_slice_view) namespace = datalab.utils.commands.notebook_environment() return datalab.utils.commands.handle_magic_line(line, cell, parser, namespace=namespace) @@ -986,6 +990,20 @@ def _package(args, cell): print 'Package created at %s.' % dest +def _get_lantern_format(df): + if ('count' not in df) or ('feature' not in df): + raise Exception('No "count" or "feature" found in data.') + metric_names = list(set(df) - set(['feature'])) + data = [] + for ii, row in df.iterrows(): + metric_values = dict(row) + metric_values['totalWeightedExamples'] = metric_values['count'] + del metric_values['feature'] + del metric_values['count'] + data.append({'feature': row['feature'], 'metricValues': metric_values}) + return data + + def _feature_slice_view(args, cell): HTML_TEMPLATE = """ @@ -997,10 +1015,17 @@ def _feature_slice_view(args, cell): browser.weightedExamplesColumn = 'totalWeightedExamples'; browser.calibrationPlotUriFn = function(s) { return '/' + s; } """ - with open(args['file']) as f: - data = map(json.loads, f) - if args['feature']: - data = [e for e in data if e['feature'].split(':')[0] == args['feature']] + if args['sql'] is not None: + item = datalab.utils.commands.get_notebook_item(args['sql']) + item, _ = datalab.data.SqlModule.get_sql_statement_with_environment(item, {}) + query = datalab.bigquery.Query(item) + df = query.results().to_dataframe() + data = _get_lantern_format(df) + elif args['dataframe'] is not None: + item = datalab.utils.commands.get_notebook_item(args['dataframe']) + data = _get_lantern_format(item) + else: + raise Exception('either --sql or --dataframe is needed.') metrics_str = str(map(str, data[0]['metricValues'].keys())) data_str = str([{str(k): json.dumps(v) for k,v in elem.iteritems()} for elem in data]) html_id = 'l' + datalab.utils.commands.Html.next_id() diff --git a/solutionbox/inception/datalab_solutions/inception/__init__.py b/solutionbox/inception/datalab_solutions/inception/__init__.py index 8396f6435..12407cb30 100644 --- a/solutionbox/inception/datalab_solutions/inception/__init__.py +++ b/solutionbox/inception/datalab_solutions/inception/__init__.py @@ -11,4 +11,5 @@ # the License. -from ._package import local_preprocess, cloud_preprocess, local_train, cloud_train, local_predict, cloud_predict +from ._package import local_preprocess, cloud_preprocess, local_train, cloud_train, local_predict, \ + cloud_predict, local_batch_predict, cloud_batch_predict diff --git a/solutionbox/inception/datalab_solutions/inception/_cloud.py b/solutionbox/inception/datalab_solutions/inception/_cloud.py index 55d790852..2cc26bc02 100644 --- a/solutionbox/inception/datalab_solutions/inception/_cloud.py +++ b/solutionbox/inception/datalab_solutions/inception/_cloud.py @@ -31,16 +31,13 @@ from . import _util -_CLOUDML_DISCOVERY_URL = 'https://storage.googleapis.com/cloud-ml/discovery/' \ - 'ml_v1beta1_discovery.json' _TF_GS_URL= 'gs://cloud-datalab/deploy/tf/tensorflow-0.12.0rc0-cp27-none-linux_x86_64.whl' class Cloud(object): """Class for cloud training, preprocessing and prediction.""" - def __init__(self, project, checkpoint=None): - self._project = project + def __init__(self, checkpoint=None): self._checkpoint = checkpoint if self._checkpoint is None: self._checkpoint = _util._DEFAULT_CHECKPOINT_GSURL @@ -53,7 +50,7 @@ def preprocess(self, input_csvs, labels_file, output_dir, pipeline_option=None): 'staging_location': os.path.join(output_dir, 'tmp', 'staging'), 'temp_location': os.path.join(output_dir, 'tmp'), 'job_name': job_name, - 'project': self._project, + 'project': _util.default_project(), 'extra_packages': [ml.sdk_location, _util._PACKAGE_GS_URL, _TF_GS_URL], 'teardown_policy': 'TEARDOWN_ALWAYS', 'no_save_main_session': True @@ -67,13 +64,13 @@ def preprocess(self, input_csvs, labels_file, output_dir, pipeline_option=None): p, self._checkpoint, input_csvs, labels_file, output_dir, job_name) p.run() - def train(self, labels_file, input_dir, batch_size, max_steps, output_path, credentials, + def train(self, labels_file, input_dir, batch_size, max_steps, output_path, region, scale_tier): """Cloud training with CloudML trainer service.""" + import datalab.mlalpha as mlalpha num_classes = len(_util.get_labels(labels_file)) - job_id = 'inception_train_' + datetime.datetime.now().strftime('%y%m%d_%H%M%S') - job_args_dict = { + job_args = { 'input_dir': input_dir, 'output_path': output_path, 'max_steps': max_steps, @@ -81,18 +78,6 @@ def train(self, labels_file, input_dir, batch_size, max_steps, output_path, cred 'num_classes': num_classes, 'checkpoint': self._checkpoint } - # convert job_args from dict to list as service required. - job_args = [] - for k,v in job_args_dict.iteritems(): - if isinstance(v, list): - for item in v: - - job_args.append('--' + k) - job_args.append(str(item)) - else: - job_args.append('--' + k) - job_args.append(str(v)) - job_request = { 'package_uris': _util._PACKAGE_GS_URL, 'python_module': 'datalab_solutions.inception.task', @@ -100,22 +85,20 @@ def train(self, labels_file, input_dir, batch_size, max_steps, output_path, cred 'region': region, 'args': job_args } - job = { - 'job_id': job_id, - 'training_input': job_request, - } - cloudml = discovery.build('ml', 'v1beta1', discoveryServiceUrl=_CLOUDML_DISCOVERY_URL, - credentials=credentials) - request = cloudml.projects().jobs().create(body=job, - parent='projects/' + self._project) - request.headers['user-agent'] = 'GoogleCloudDataLab/1.0' - job_info = request.execute() - return job_info - - def predict(self, model_id, image_files, labels_file, credentials): + cloud_runner = mlalpha.CloudRunner(job_request) + job_id = 'inception_train_' + datetime.datetime.now().strftime('%y%m%d_%H%M%S') + return cloud_runner.run(job_id) + + def predict(self, model_id, image_files, labels_file): """Cloud prediction with CloudML prediction service.""" + import datalab.mlalpha as mlalpha + parts = model_id.split('.') + if len(parts) != 2: + raise Exception('Invalid model name for cloud prediction. Use "model.version".') + labels = _util.get_labels(labels_file) + labels.append('UNKNOWN') data = [] for ii, img_file in enumerate(image_files): with ml.util._file.open_local_or_gcs(img_file, 'rb') as f: @@ -124,17 +107,9 @@ def predict(self, model_id, image_files, labels_file, credentials): 'key': str(ii), 'image_bytes': {'b64': img} }) - parts = model_id.split('.') - if len(parts) != 2: - raise Exception('Invalid model name for cloud prediction. Use "model.version".') - full_version_name = ('projects/%s/models/%s/versions/%s' % (self._project, parts[0], parts[1])) - api = discovery.build('ml', 'v1beta1', credentials=credentials, - discoveryServiceUrl=_CLOUDML_DISCOVERY_URL) - request = api.projects().predict(body={'instances': data}, name=full_version_name) - job_results = request.execute() - if 'predictions' not in job_results: - raise Exception('Invalid response from service. Cannot find "predictions" in response.') - predictions = job_results['predictions'] + + cloud_predictor = mlalpha.CloudPredictor(parts[0], parts[1]) + predictions = cloud_predictor.predict(data) labels_and_scores = [(labels[x['prediction']], x['scores'][x['prediction']]) for x in predictions] return labels_and_scores diff --git a/solutionbox/inception/datalab_solutions/inception/_local.py b/solutionbox/inception/datalab_solutions/inception/_local.py index 3353844d3..a57cdbc98 100644 --- a/solutionbox/inception/datalab_solutions/inception/_local.py +++ b/solutionbox/inception/datalab_solutions/inception/_local.py @@ -18,13 +18,18 @@ import apache_beam as beam import collections +import csv import datetime +import google.cloud.ml as ml import json import os import tensorflow as tf +import yaml + -from . import _preprocess from . import _model +from . import _predictor +from . import _preprocess from . import _trainer from . import _util @@ -46,34 +51,23 @@ def preprocess(self, input_csvs, labels_file, output_dir): output_dir, job_id) p.run() - def train(self, labels_file, input_dir, batch_size, max_steps, output_path): + def train(self, labels_file, input_dir, batch_size, max_steps, output_dir): """Local training.""" num_classes = len(_util.get_labels(labels_file)) model = _model.Model(num_classes, 0.5, self._checkpoint) task_data = {'type': 'master', 'index': 0} task = type('TaskSpec', (object,), task_data) - _trainer.Trainer(input_dir, batch_size, max_steps, output_path, + _trainer.Trainer(input_dir, batch_size, max_steps, output_dir, model, None, task).run_training() def predict(self, model_dir, image_files, labels_file): """Local prediction.""" - labels = _util.get_labels(labels_file) - model_dir = os.path.join(model_dir, 'model') - with tf.Session() as sess: - new_saver = tf.train.import_meta_graph(os.path.join(model_dir, 'export.meta')) - new_saver.restore(sess, os.path.join(model_dir, 'export')) - inputs = json.loads(tf.get_collection('inputs')[0]) - outputs = json.loads(tf.get_collection('outputs')[0]) - feed_dict = collections.defaultdict(list) - for ii, image_filename in enumerate(image_files): - with open(image_filename) as ff: - image_bytes = ff.read() - feed_dict[inputs['image_bytes']].append(image_bytes) - feed_dict[inputs['key']].append(str(ii)) - predictions, scores = sess.run([outputs['prediction'], outputs['scores']], - feed_dict=feed_dict) - - labels_and_scores = [(labels[predicted_index], class_scores[predicted_index]) - for predicted_index, class_scores in zip(predictions, scores)] - return labels_and_scores + + return _predictor.predict(model_dir, image_files, labels_file) + + + def batch_predict(self, model_dir, input_csv, labels_file, output_file, output_bq_table): + """Local batch prediction.""" + + return _predictor.batch_predict(model_dir, input_csv, labels_file, output_file, output_bq_table) diff --git a/solutionbox/inception/datalab_solutions/inception/_package.py b/solutionbox/inception/datalab_solutions/inception/_package.py index 0bbecdae8..9e949987c 100644 --- a/solutionbox/inception/datalab_solutions/inception/_package.py +++ b/solutionbox/inception/datalab_solutions/inception/_package.py @@ -36,51 +36,106 @@ def local_preprocess(input_csvs, labels_file, output_dir, checkpoint=None): + """Preprocess data locally. Produce output that can be used by training efficiently. + Args: + input_csvs: A list of CSV files which include two columns only: image_gs_url, label. + Preprocessing will concatenate the data inside all files and split them into + train/eval dataset. Can be local or GCS path. + labels_file: The path to the labels file which lists all labels, each in a separate line. + It can be a local or a GCS path. + output_dir: The output directory to use. Preprocessing will create a sub directory under + it for each run, and also update "latest" file which points to the latest preprocessed + directory. Users are responsible for cleanup. Can be local or GCS path. + checkpoint: the Inception checkpoint to use. + """ + print 'Local preprocessing...' # TODO: Move this to a new process to avoid pickling issues + # TODO: Expose train/eval split ratio _local.Local(checkpoint).preprocess(input_csvs, labels_file, output_dir) print 'Done' -def cloud_preprocess(input_csvs, labels_file, output_dir, project, checkpoint=None, +def cloud_preprocess(input_csvs, labels_file, output_dir, checkpoint=None, pipeline_option=None): + """Preprocess data in Cloud with DataFlow. + Produce output that can be used by training efficiently. + Args: + input_csvs: A list of CSV files which include two columns only: image_gs_url, label. + Preprocessing will concatenate the data inside all files and split them into + train/eval dataset. GCS paths only. + labels_file: The GCS path to the labels file which lists all labels, each in a separate line. + output_dir: The output directory to use. Preprocessing will create a sub directory under + it for each run, and also update "latest" file which points to the latest preprocessed + directory. Users are responsible for cleanup. GCS path only. + checkpoint: the Inception checkpoint to use. + """ + # TODO: Move this to a new process to avoid pickling issues - _cloud.Cloud(project, checkpoint).preprocess(input_csvs, labels_file, output_dir, + # TODO: Expose train/eval split ratio + # TODO: Consider exposing explicit train/eval datasets + _cloud.Cloud(checkpoint=checkpoint).preprocess(input_csvs, labels_file, output_dir, pipeline_option) if (_util.is_in_IPython()): import IPython - dataflow_url = 'https://console.developers.google.com/dataflow?project=%s' % project + + dataflow_url = 'https://console.developers.google.com/dataflow?project=%s' % \ + _util.default_project() html = 'Job submitted.' html += '

Click here to track preprocessing job.
' \ % dataflow_url IPython.display.display_html(html, raw=True) -def local_train(labels_file, input_dir, batch_size, max_steps, output_path, checkpoint=None): +def local_train(labels_file, input_dir, batch_size, max_steps, output_dir, checkpoint=None): + """Train model locally. The output can be used for local prediction or for online deployment. + Args: + labels_file: The path to the labels file which lists all labels, each in a separate line. + It can be a local or a GCS path. + input_dir: A directory path containing preprocessed results. Can be local or GCS path. + batch_size: size of batch used for training. + max_steps: number of steps to train. + output_dir: The output directory to use. Can be local or GCS path. + checkpoint: the Inception checkpoint to use. + """ + logger = logging.getLogger() original_level = logger.getEffectiveLevel() logger.setLevel(logging.INFO) print 'Local training...' try: - _local.Local(checkpoint).train(labels_file, input_dir, batch_size, max_steps, output_path) + _local.Local(checkpoint).train(labels_file, input_dir, batch_size, max_steps, output_dir) finally: logger.setLevel(original_level) print 'Done' -def cloud_train(labels_file, input_dir, batch_size, max_steps, output_path, - project, credentials, region, scale_tier='BASIC', checkpoint=None): - job_info = _cloud.Cloud(project, checkpoint).train(labels_file, input_dir, batch_size, - max_steps, output_path, credentials, region, scale_tier) +def cloud_train(labels_file, input_dir, batch_size, max_steps, output_dir, + region, scale_tier='BASIC', checkpoint=None): + """Train model in the cloud with CloudML trainer service. + The output can be used for local prediction or for online deployment. + Args: + labels_file: The path to the labels file which lists all labels, each in a separate line. + GCS path only. + input_dir: A directory path containing preprocessed results. GCS path only. + batch_size: size of batch used for training. + max_steps: number of steps to train. + output_dir: The output directory to use. GCS path only. + checkpoint: the Inception checkpoint to use. + """ + + job_info = _cloud.Cloud(checkpoint=checkpoint).train(labels_file, input_dir, batch_size, + max_steps, output_dir, region, scale_tier) if (_util.is_in_IPython()): import IPython log_url_query_strings = { - 'project': project, + 'project': _util.default_project(), 'resource': 'ml.googleapis.com/job_id/' + job_info['jobId'] } log_url = 'https://console.developers.google.com/logs/viewer?' + \ urllib.urlencode(log_url_query_strings) - html = '

Click here to view cloud log.
' % log_url + html = 'Job submitted.' + html += '

Click here to view cloud log.
' % log_url IPython.display.display_html(html, raw=True) @@ -100,13 +155,49 @@ def _display_predict_results(results, show_image): def local_predict(model_dir, image_files, labels_file, show_image=True): + """Predict using an offline model. + Args: + model_dir: The directory of a trained inception model. Can be local or GCS paths. + image_files: The paths to the image files to predict labels. Can be local or GCS paths. + labels_file: The path to the labels file which lists all labels, each in a separate line. + Can be local or GCS paths. + show_image: Whether to show images in the results. + """ + labels_and_scores = _local.Local().predict(model_dir, image_files, labels_file) results = zip(image_files, labels_and_scores) _display_predict_results(results, show_image) -def cloud_predict(model_id, image_files, labels_file, project, credentials, show_image=True): - labels_and_scores = _cloud.Cloud(project).predict(model_id, image_files, labels_file, - credentials) +def cloud_predict(model_id, image_files, labels_file, show_image=True): + """Predict using a deployed (online) model. + Args: + model_id: The deployed model id in the form of "model.version". + image_files: The paths to the image files to predict labels. GCS paths only. + labels_file: The path to the labels file which lists all labels, each in a separate line. + GCS paths only. + show_image: Whether to show images in the results. + """ + + labels_and_scores = _cloud.Cloud().predict(model_id, image_files, labels_file) results = zip(image_files, labels_and_scores) _display_predict_results(results, show_image) + + +def local_batch_predict(model_dir, input_csv, labels_file, output_file, output_bq_table=None): + """Batch predict using an offline model. + Args: + model_dir: The directory of a trained inception model. Can be local or GCS paths. + input_csv: The input csv which include two columns only: image_gs_url, label. + Can be local or GCS paths. + labels_file: The path to the labels file which lists all labels, each in a separate line. + Can be local or GCS paths. + output_file: The output csv file containing prediction results. + output_bq_table: If provided, will also save the results to BigQuery table. + """ + _local.Local().batch_predict(model_dir, input_csv, labels_file, output_file, output_bq_table) + + +def cloud_batch_predict(model_dir, image_files, labels_file, show_image=True, output_file=None): + """Not Implemented Yet""" + pass diff --git a/solutionbox/inception/datalab_solutions/inception/_predictor.py b/solutionbox/inception/datalab_solutions/inception/_predictor.py new file mode 100644 index 000000000..c6f513d19 --- /dev/null +++ b/solutionbox/inception/datalab_solutions/inception/_predictor.py @@ -0,0 +1,98 @@ +# Copyright 2017 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Local implementation for preprocessing, training and prediction for inception model. +""" + +import collections +import csv +import google.cloud.ml as ml +import json +import os +import tensorflow as tf +import yaml + +from . import _util + + +def _tf_predict(model_dir, image_files): + model_dir = os.path.join(model_dir, 'model') + with tf.Session() as sess: + new_saver = tf.train.import_meta_graph(os.path.join(model_dir, 'export.meta')) + new_saver.restore(sess, os.path.join(model_dir, 'export')) + inputs = json.loads(tf.get_collection('inputs')[0]) + outputs = json.loads(tf.get_collection('outputs')[0]) + feed_dict = collections.defaultdict(list) + for ii, image_filename in enumerate(image_files): + with ml.util._file.open_local_or_gcs(image_filename, 'r') as ff: + image_bytes = ff.read() + feed_dict[inputs['image_bytes']].append(image_bytes) + feed_dict[inputs['key']].append(str(ii)) + predictions, scores = sess.run([outputs['prediction'], outputs['scores']], + feed_dict=feed_dict) + return predictions, scores + + +def predict(model_dir, image_files, labels_file): + """Local prediction.""" + + predictions, scores = _tf_predict(model_dir, image_files) + labels = _util.get_labels(labels_file) + labels.append('UNKNOWN') + labels_and_scores = [(labels[predicted_index], class_scores[predicted_index]) + for predicted_index, class_scores in zip(predictions, scores)] + return labels_and_scores + + +def batch_predict(model_dir, input_csv, labels_file, output_file, output_bq_table): + """Local batch prediction.""" + + input_csv_f = ml.util._file.read_file_stream(input_csv) + reader = csv.reader(input_csv_f) + image_files = [x[0] for x in reader] + predictions, scores = _tf_predict(model_dir, image_files) + + labels = _util.get_labels(labels_file) + labels.append('UNKNOWN') + input_csv_f = ml.util._file.read_file_stream(input_csv) + reader = csv.reader(input_csv_f) + with ml.util._file.open_local_or_gcs(output_file, mode='w') as f_out: + writer = csv.writer(f_out) + for input, predicted_index, class_scores in zip(reader, predictions, scores): + target_index = labels.index(input[1]) + target_prob = class_scores[target_index] + predicted_prob = class_scores[predicted_index] + writer.writerow(input + [labels[predicted_index]] + [str(target_prob)] + + [str(predicted_prob)] + map(str, class_scores)) + schema = [ + {'name': 'image_url', 'type': 'STRING'}, + {'name': 'target', 'type': 'STRING'}, + {'name': 'predicted', 'type': 'STRING'}, + {'name': 'target_prob', 'type': 'FLOAT'}, + {'name': 'predicted_prob', 'type': 'FLOAT'}, + ] + for l in labels: + schema.append({'name': 'prob_' + l, 'type': 'FLOAT'}) + schema_file = output_file + '.schema.yaml' + with ml.util._file.open_local_or_gcs(schema_file, 'w') as yaml_file: + yaml.dump(schema, yaml_file, default_flow_style=False) + + if output_bq_table is not None: + import datalab.bigquery as bq + dataset_name, table_name = output_bq_table.split('.') + bq.Dataset(dataset_name).create() + eval_results_table = bq.Table(output_bq_table).create(schema, overwrite = True) + eval_results_table.load(output_file, mode='append', source_format='csv') + return output_file, schema_file diff --git a/solutionbox/inception/datalab_solutions/inception/_util.py b/solutionbox/inception/datalab_solutions/inception/_util.py index 65c2a4d57..e14fcee50 100644 --- a/solutionbox/inception/datalab_solutions/inception/_util.py +++ b/solutionbox/inception/datalab_solutions/inception/_util.py @@ -36,6 +36,12 @@ def is_in_IPython(): return False +def default_project(): + import datalab.context + context = datalab.context.Context.default() + return context.project_id + + def get_train_eval_files(input_dir): latest_file = os.path.join(input_dir, 'latest') with ml.util._file.open_local_or_gcs(latest_file, 'r') as f: diff --git a/solutionbox/inception/release.sh b/solutionbox/inception/release.sh index c4c41acbb..adb27aa21 100755 --- a/solutionbox/inception/release.sh +++ b/solutionbox/inception/release.sh @@ -1,6 +1,6 @@ #!/bin/sh -# Copyright 2016 Google Inc. All rights reserved. +# Copyright 2017 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at @@ -14,6 +14,8 @@ # Build a distribution package python setup.py sdist --formats=gztar + +# Release to gs gsutil cp ./dist/inception-0.1.tar.gz gs://cloud-datalab/packages/inception-0.1.tar.gz diff --git a/solutionbox/inception/setup.py b/solutionbox/inception/setup.py index 443ff7327..30bd6a8e0 100644 --- a/solutionbox/inception/setup.py +++ b/solutionbox/inception/setup.py @@ -1,4 +1,4 @@ -# Copyright 2015 Google Inc. All rights reserved. +# Copyright 2017 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at