diff --git a/datalab/mlalpha/__init__.py b/datalab/mlalpha/__init__.py index 00eae7478..3f24d50eb 100644 --- a/datalab/mlalpha/__init__.py +++ b/datalab/mlalpha/__init__.py @@ -26,8 +26,7 @@ from ._package import Packager from ._cloud_models import CloudModels, CloudModelVersions from ._confusion_matrix import ConfusionMatrix -from ._analysis import CsvEvalResults, CsvEvalSource, EvalResultsCsvCoder, \ - AccuracyFn, FeatureSlicingPipeline +from ._analysis import csv_to_dataframe from ._package_runner import PackageRunner from plotly.offline import init_notebook_mode diff --git a/datalab/mlalpha/_analysis.py b/datalab/mlalpha/_analysis.py index 06356af78..57d1d5e3b 100644 --- a/datalab/mlalpha/_analysis.py +++ b/datalab/mlalpha/_analysis.py @@ -10,206 +10,39 @@ # or implied. See the License for the specific language governing permissions and limitations under # the License. -"""Implements Cloud ML Eval Results Analysis""" +"""Implements Cloud ML Analysis Helpers""" -import apache_beam as beam -from collections import namedtuple -"""Prepresents an eval results CSV file. For example, the content is like: - 107,Iris-versicolor,1.64827824278e-07,0.999999880791,6.27104979056e-10 - 100,Iris-versicolor,3.5338824091e-05,0.99996471405,1.32811195375e-09 - ... -""" -CsvEvalResults = namedtuple('CsvEvalResults', 'source, key_index predicted_index score_index_start num_scores') +import google.cloud.ml as ml +import numpy as np +import pandas as pd +import yaml -"""Prepresents an eval source CSV file. For example, the content is like: - 107,Iris-virginica,4.9,2.5,4.5,1.7 - 100,Iris-versicolor,5.7,2.8,4.1,1.3 - ... - The metadata is generated in the preprocessing pipeline. It is used to describe the CSV file, - including schema, headers, etc. -""" -CsvEvalSource = namedtuple('CsvEvalSource', 'source metadata') +import datalab.bigquery as bq -class EvalResultsCsvCoder(beam.coders.Coder): - """A coder to read from Eval results CSV file. Note encode() is only needed in cloud run. - """ - def __init__(self, eval_results): - self._eval_results = eval_results - - def decode(self, csv_line): - import csv - source_elem = next(csv.reader([csv_line])) - key = source_elem[self._eval_results.key_index] - element = { - 'predicted': source_elem[self._eval_results.predicted_index], - 'scores': source_elem[self._eval_results.score_index_start: \ - self._eval_results.score_index_start+self._eval_results.num_scores] - } - return (key, element) - - def encode(self, element): - return str(element) - - -class AccuracyFn(beam.CombineFn): - """A transform to compute accuracy for feature slices. - """ - def __init__(self, target_column_name): - self._target_column_name = target_column_name - - def create_accumulator(self): - return (0.0, 0) - - def add_input(self, (sum, count), input): - new_sum = sum - if (input['predicted'] == input[self._target_column_name]): - new_sum += 1 - return new_sum, count + 1 - - def merge_accumulators(self, accumulators): - sums, counts = zip(*accumulators) - return sum(sums), sum(counts) - - def extract_output(self, (sum, count)): - accuracy = float(sum) / count if count else float('NaN') - return {'accuracy': accuracy, 'totalWeightedExamples': count} - - -class FeatureSlicingPipeline(object): - """The pipeline to generate feature slicing stats. For example, accuracy values given - "species = Iris-versicolor", "education = graduate", etc. - It is implemented with DataFlow. - """ - @staticmethod - def _pair_source_with_key(element): - key = element['key'] - del element['key'] - return (key, element) - - @staticmethod - def _join_info((key, info)): - value = info['source'][0] - value.update(info['results'][0]) - return (key, value) - - def _pipeline_def(self, p, eval_source, eval_results, features_to_slice, metrics, output_file, - shard_name_template=None): - import datalab.mlalpha as mlalpha - import google.cloud.ml.io as io - import json - - metadata = mlalpha.Metadata(eval_source.metadata) - target_name, _ = metadata.get_target_name_and_scenario() - - # Load eval source. - eval_source_coder = io.CsvCoder(metadata.get_csv_headers(), metadata.get_numeric_columns()) - eval_source_data = p | beam.io.ReadFromText(eval_source.source, coder=eval_source_coder) | \ - beam.Map('pair_source_with_key', FeatureSlicingPipeline._pair_source_with_key) - - # Load eval results. - eval_results_data = p | \ - beam.Read('ReadEvalResults', beam.io.TextFileSource(eval_results.source, - coder=EvalResultsCsvCoder(eval_results))) - - # Join source with results by key. - joined_results = {'source': eval_source_data, 'results': eval_results_data} | \ - beam.CoGroupByKey() | beam.Map('join by key', FeatureSlicingPipeline._join_info) - - feature_metrics_list = [] - for feature_to_slice in features_to_slice: - feature_metrics = joined_results | \ - beam.Map('slice_get_key_%s' % feature_to_slice, - lambda (k,v),f=feature_to_slice: (v[f], v)) | \ - beam.CombinePerKey('slice_combine_%s' % feature_to_slice, - AccuracyFn(target_name)) | \ - beam.Map('slice_prepend_feature_name_%s' % feature_to_slice, - lambda (k,v),f=feature_to_slice: ('%s:%s' % (f, k), v)) - feature_metrics_list.append(feature_metrics) - - feature_metrics_list | beam.Flatten() | \ - beam.Map('ToJsonFormat', lambda (k,v): json.dumps({'feature': k, 'metricValues': v})) | \ - beam.io.WriteToText(output_file, shard_name_template=shard_name_template) - return p - - - def run_local(self, eval_source, eval_results, features_to_slice, metrics, output_file): - """Run the pipeline locally. Blocks execution until it finishes. - - Args: - eval_source: The only supported format is CsvEvalResults now while we may add more. - Note the source can be either a GCS path or a local path. - eval_results: The only supported format is CsvEvalSource now while we may add more. - Note the source can be either a GCS path or a local path. - features_to_slice: A list of features to slice on. The features must exist in - eval_source, and can be numeric, categorical, or target. - metrics: A list of metrics to compute. For classification, it supports "accuracy", - "logloss". For regression, it supports "RMSE". - output_file: The path to a local file holding the aggregated results. - """ - p = beam.Pipeline('DirectPipelineRunner') - self._pipeline_def(p, eval_source, eval_results, features_to_slice, metrics, output_file, - shard_name_template='') - p.run() - - - def default_pipeline_options(self, output_dir): - """Get default DataFlow options. Users can customize it further on top of it and then - send the option to run_cloud(). +def csv_to_dataframe(csv_path, schema_path): + """Given a CSV file together with its BigQuery schema file in yaml, load + content into a dataframe. Args: - output_dir: A GCS path which will be used as base path for tmp and staging dir. + csv_path: Input CSV path. Can be local or GCS. + schema_path: Input schema path. Can be local or GCS. Returns: - A dictionary of options. - """ - import datalab.context as context - import datetime - import google.cloud.ml as ml - import os - - options = { - 'staging_location': os.path.join(output_dir, 'tmp', 'staging'), - 'temp_location': os.path.join(output_dir, 'tmp'), - 'job_name': 'feature-slicing-pipeline' + '-' + \ - datetime.datetime.now().strftime('%y%m%d-%H%M%S'), - 'project': context.Context.default().project_id, - 'extra_packages': ['gs://cloud-datalab/dataflow/datalab.tar.gz', ml.sdk_location], - 'teardown_policy': 'TEARDOWN_ALWAYS', - 'no_save_main_session': True - } - return options - - def run_cloud(self, eval_source, eval_results, features_to_slice, metrics, output_file, - pipeline_option=None): - """Run the pipeline in cloud. Returns when the job is submitted. - Calling of this function may incur some cost since it runs a DataFlow job in Google Cloud. - If pipeline_option is not specified, make sure you are signed in (through Datalab) - and a default project is set so it can get credentials and projects from global context. - - Args: - eval_source: The only supported format is CsvEvalResults now while we may add more. - The source needs to be a GCS path and is readable to current signed in user. - eval_results: The only supported format is CsvEvalSource now while we may add more. - The source needs to be a GCS path and is readable to current signed in user. - features_to_slice: A list of features to slice on. The features must exist in - eval_source, and can be numeric, categorical, or target. - metrics: A list of metrics to compute. For classification, it supports "accuracy", - "logloss". For regression, it supports "RMSE". - pipeline_option: If not specified, use default options. Recommend customizing your options - based on default one obtained from default_pipeline_options(). For example, - options = fsp.default_pipeline_options() - options['num_workers'] = 10 - ... - output_file: A GCS file prefix holding the aggregated results. - """ - import os - if pipeline_option is None: - output_dir = os.path.dirname(output_file) - pipeline_option = self.default_pipeline_options(output_dir) - opts = beam.pipeline.PipelineOptions(flags=[], **pipeline_option) - p = beam.Pipeline('DataflowPipelineRunner', options=opts) - self._pipeline_def(p, eval_source, eval_results, features_to_slice, metrics, output_file) - p.run() - + Loaded pandas dataframe. + """ + with ml.util._file.open_local_or_gcs(schema_path, mode='r') as f: + schema = yaml.safe_load(f) + _MAPPINGS = { + 'FLOAT': np.float64, + 'INTEGER': np.int64, + 'TIMESTAMP': np.datetime64, + 'BOOLEAN': np.bool, + } + for item in schema: + item['type'] = _MAPPINGS.get(item['type'], object) + names = [x['name'] for x in schema] + dtype = {x['name']: x['type'] for x in schema} + with ml.util._file.open_local_or_gcs(csv_path, mode='r') as f: + return pd.read_csv(f, names=names, dtype=dtype) diff --git a/datalab/mlalpha/commands/_ml.py b/datalab/mlalpha/commands/_ml.py index 4ce46f5bd..0306601c0 100644 --- a/datalab/mlalpha/commands/_ml.py +++ b/datalab/mlalpha/commands/_ml.py @@ -17,9 +17,15 @@ raise Exception('This module can only be loaded in ipython.') import collections +import google.cloud.ml as cloudml +import matplotlib.pyplot as plt +import numpy as np import os +import pandas as pd +from sklearn.metrics import confusion_matrix import yaml + import datalab.context import datalab.mlalpha import datalab.utils.commands @@ -87,6 +93,23 @@ def ml(line, cell=None): required=True) batch_predict_parser.set_defaults(func=_batch_predict) + confusion_matrix_parser = parser.subcommand('confusion_matrix', + 'Plot confusion matrix. The source is provided ' + + 'in one of "csv", "bqtable", and "sql" params.') + confusion_matrix_parser.add_argument('--csv', + help='GCS or local path of CSV file which contains ' + + '"target", "predicted" columns at least. The CSV ' + + 'either comes with a schema file in the same dir, ' + + 'or specify "headers: name1, name2..." in cell.') + confusion_matrix_parser.add_argument('--bqtable', + help='name of the BigQuery table in the form of ' + + 'dataset.table.') + confusion_matrix_parser.add_argument('--sql', + help='name of the sql module defined in previous cell ' + + 'which should return "target", "predicted", ' + + 'and "count" columns at least in results.') + confusion_matrix_parser.set_defaults(func=_confusion_matrix) + namespace = datalab.utils.commands.notebook_environment() return datalab.utils.commands.handle_magic_line(line, cell, parser, namespace=namespace) @@ -158,3 +181,66 @@ def _predict(args, cell): def _batch_predict(args, cell): return _run_package(args, cell, 'batch_predict') + + +def _plot_confusion_matrix(cm, labels): + plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues) + plt.title('Confusion matrix') + plt.colorbar() + tick_marks = np.arange(len(labels)) + plt.xticks(tick_marks, labels, rotation=45) + plt.yticks(tick_marks, labels) + plt.tight_layout() + plt.ylabel('True label') + plt.xlabel('Predicted label') + + +def _confusion_matrix_from_csv(input_csv, cell): + schema_file = input_csv + '.schema.yaml' + headers = None + if cell is not None: + env = datalab.utils.commands.notebook_environment() + config = datalab.utils.commands.parse_config(cell, env) + headers_str = config.get('headers', None) + if headers_str is not None: + headers = [x.strip() for x in headers_str.split(',')] + if headers is not None: + with cloudml.util._file.open_local_or_gcs(input_csv, mode='r') as f: + df = pd.read_csv(f, names=headers) + elif cloudml.util._file.file_exists(schema_file): + df = datalab.mlalpha.csv_to_dataframe(input_csv, schema_file) + else: + raise Exception('headers is missing from cell, ' + + 'and there is no schema file in the same dir as csv') + labels = sorted(set(df['target']) | set(df['predicted'])) + cm = confusion_matrix(df['target'], df['predicted'], labels=labels) + return cm, labels + + +def _confusion_matrix_from_query(sql_module_name, bq_table): + if sql_module_name is not None: + item = datalab.utils.commands.get_notebook_item(sql_module_name) + query, _ = datalab.data.SqlModule.get_sql_statement_with_environment(item, {}) + else: + query = ('select target, predicted, count(*) as count from %s group by target, predicted' + % bq_table) + dfbq = datalab.bigquery.Query(query).results().to_dataframe() + labels = sorted(set(dfbq['target']) | set(dfbq['predicted'])) + labels_count = len(labels) + dfbq['target'] = [labels.index(x) for x in dfbq['target']] + dfbq['predicted'] = [labels.index(x) for x in dfbq['predicted']] + cm = [[0]*labels_count for i in range(labels_count)] + for index, row in dfbq.iterrows(): + cm[row['target']][row['predicted']] = row['count'] + return cm, labels + + +def _confusion_matrix(args, cell): + if args['csv'] is not None: + #TODO: Maybe add cloud run for large CSVs with federated table. + cm, labels = _confusion_matrix_from_csv(args['csv'], cell) + elif args['sql'] is not None or args['bqtable'] is not None: + cm, labels = _confusion_matrix_from_query(args['sql'], args['bqtable']) + else: + raise Exception('One of "csv", "bqtable", and "sql" param is needed.') + _plot_confusion_matrix(cm, labels) diff --git a/solutionbox/inception/datalab_solutions/inception/_trainer.py b/solutionbox/inception/datalab_solutions/inception/_trainer.py index 3e9d49d6e..f3f4f9b1f 100644 --- a/solutionbox/inception/datalab_solutions/inception/_trainer.py +++ b/solutionbox/inception/datalab_solutions/inception/_trainer.py @@ -41,7 +41,10 @@ class Evaluator(object): """Loads variables from latest checkpoint and performs model evaluation.""" def __init__(self, model, data_paths, batch_size, output_path, dataset='eval'): - self.num_eval_batches = self._data_size(data_paths) // batch_size + data_size = self._data_size(data_paths) + if data_size <= batch_size: + raise Exception('Data size is smaller than batch size.') + self.num_eval_batches = data_size // batch_size self.batch_of_examples = [] self.checkpoint_path = os.path.join(output_path, 'train') self.output_path = os.path.join(output_path, dataset)