Skip to content
This repository has been archived by the owner on Sep 3, 2022. It is now read-only.

Commit

Permalink
Remove old feature-slicing pipeline implementation (is replaced by Bi…
Browse files Browse the repository at this point in the history
…gQuery) Add Confusion matrix magic. (#129)

* Remove old feature-slicing pipeline implementation (is replaced by BigQuery).
Add Confusion matrix magic.

* Follow up on code review comments. Also fix an inception issue that eval loss is nan when eval size is smaller than batch size.

* Fix set union.
  • Loading branch information
qimingj committed Feb 13, 2017
1 parent 9f1f974 commit 0492e6f
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 197 deletions.
3 changes: 1 addition & 2 deletions datalab/mlalpha/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
from ._package import Packager
from ._cloud_models import CloudModels, CloudModelVersions
from ._confusion_matrix import ConfusionMatrix
from ._analysis import CsvEvalResults, CsvEvalSource, EvalResultsCsvCoder, \
AccuracyFn, FeatureSlicingPipeline
from ._analysis import csv_to_dataframe
from ._package_runner import PackageRunner

from plotly.offline import init_notebook_mode
Expand Down
221 changes: 27 additions & 194 deletions datalab/mlalpha/_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,206 +10,39 @@
# or implied. See the License for the specific language governing permissions and limitations under
# the License.

"""Implements Cloud ML Eval Results Analysis"""
"""Implements Cloud ML Analysis Helpers"""

import apache_beam as beam
from collections import namedtuple

"""Prepresents an eval results CSV file. For example, the content is like:
107,Iris-versicolor,1.64827824278e-07,0.999999880791,6.27104979056e-10
100,Iris-versicolor,3.5338824091e-05,0.99996471405,1.32811195375e-09
...
"""
CsvEvalResults = namedtuple('CsvEvalResults', 'source, key_index predicted_index score_index_start num_scores')
import google.cloud.ml as ml
import numpy as np
import pandas as pd
import yaml

"""Prepresents an eval source CSV file. For example, the content is like:
107,Iris-virginica,4.9,2.5,4.5,1.7
100,Iris-versicolor,5.7,2.8,4.1,1.3
...
The metadata is generated in the preprocessing pipeline. It is used to describe the CSV file,
including schema, headers, etc.
"""
CsvEvalSource = namedtuple('CsvEvalSource', 'source metadata')
import datalab.bigquery as bq


class EvalResultsCsvCoder(beam.coders.Coder):
"""A coder to read from Eval results CSV file. Note encode() is only needed in cloud run.
"""
def __init__(self, eval_results):
self._eval_results = eval_results

def decode(self, csv_line):
import csv
source_elem = next(csv.reader([csv_line]))
key = source_elem[self._eval_results.key_index]
element = {
'predicted': source_elem[self._eval_results.predicted_index],
'scores': source_elem[self._eval_results.score_index_start: \
self._eval_results.score_index_start+self._eval_results.num_scores]
}
return (key, element)

def encode(self, element):
return str(element)


class AccuracyFn(beam.CombineFn):
"""A transform to compute accuracy for feature slices.
"""
def __init__(self, target_column_name):
self._target_column_name = target_column_name

def create_accumulator(self):
return (0.0, 0)

def add_input(self, (sum, count), input):
new_sum = sum
if (input['predicted'] == input[self._target_column_name]):
new_sum += 1
return new_sum, count + 1

def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)

def extract_output(self, (sum, count)):
accuracy = float(sum) / count if count else float('NaN')
return {'accuracy': accuracy, 'totalWeightedExamples': count}


class FeatureSlicingPipeline(object):
"""The pipeline to generate feature slicing stats. For example, accuracy values given
"species = Iris-versicolor", "education = graduate", etc.
It is implemented with DataFlow.
"""
@staticmethod
def _pair_source_with_key(element):
key = element['key']
del element['key']
return (key, element)

@staticmethod
def _join_info((key, info)):
value = info['source'][0]
value.update(info['results'][0])
return (key, value)

def _pipeline_def(self, p, eval_source, eval_results, features_to_slice, metrics, output_file,
shard_name_template=None):
import datalab.mlalpha as mlalpha
import google.cloud.ml.io as io
import json

metadata = mlalpha.Metadata(eval_source.metadata)
target_name, _ = metadata.get_target_name_and_scenario()

# Load eval source.
eval_source_coder = io.CsvCoder(metadata.get_csv_headers(), metadata.get_numeric_columns())
eval_source_data = p | beam.io.ReadFromText(eval_source.source, coder=eval_source_coder) | \
beam.Map('pair_source_with_key', FeatureSlicingPipeline._pair_source_with_key)

# Load eval results.
eval_results_data = p | \
beam.Read('ReadEvalResults', beam.io.TextFileSource(eval_results.source,
coder=EvalResultsCsvCoder(eval_results)))

# Join source with results by key.
joined_results = {'source': eval_source_data, 'results': eval_results_data} | \
beam.CoGroupByKey() | beam.Map('join by key', FeatureSlicingPipeline._join_info)

feature_metrics_list = []
for feature_to_slice in features_to_slice:
feature_metrics = joined_results | \
beam.Map('slice_get_key_%s' % feature_to_slice,
lambda (k,v),f=feature_to_slice: (v[f], v)) | \
beam.CombinePerKey('slice_combine_%s' % feature_to_slice,
AccuracyFn(target_name)) | \
beam.Map('slice_prepend_feature_name_%s' % feature_to_slice,
lambda (k,v),f=feature_to_slice: ('%s:%s' % (f, k), v))
feature_metrics_list.append(feature_metrics)

feature_metrics_list | beam.Flatten() | \
beam.Map('ToJsonFormat', lambda (k,v): json.dumps({'feature': k, 'metricValues': v})) | \
beam.io.WriteToText(output_file, shard_name_template=shard_name_template)
return p


def run_local(self, eval_source, eval_results, features_to_slice, metrics, output_file):
"""Run the pipeline locally. Blocks execution until it finishes.
Args:
eval_source: The only supported format is CsvEvalResults now while we may add more.
Note the source can be either a GCS path or a local path.
eval_results: The only supported format is CsvEvalSource now while we may add more.
Note the source can be either a GCS path or a local path.
features_to_slice: A list of features to slice on. The features must exist in
eval_source, and can be numeric, categorical, or target.
metrics: A list of metrics to compute. For classification, it supports "accuracy",
"logloss". For regression, it supports "RMSE".
output_file: The path to a local file holding the aggregated results.
"""
p = beam.Pipeline('DirectPipelineRunner')
self._pipeline_def(p, eval_source, eval_results, features_to_slice, metrics, output_file,
shard_name_template='')
p.run()


def default_pipeline_options(self, output_dir):
"""Get default DataFlow options. Users can customize it further on top of it and then
send the option to run_cloud().
def csv_to_dataframe(csv_path, schema_path):
"""Given a CSV file together with its BigQuery schema file in yaml, load
content into a dataframe.
Args:
output_dir: A GCS path which will be used as base path for tmp and staging dir.
csv_path: Input CSV path. Can be local or GCS.
schema_path: Input schema path. Can be local or GCS.
Returns:
A dictionary of options.
"""
import datalab.context as context
import datetime
import google.cloud.ml as ml
import os

options = {
'staging_location': os.path.join(output_dir, 'tmp', 'staging'),
'temp_location': os.path.join(output_dir, 'tmp'),
'job_name': 'feature-slicing-pipeline' + '-' + \
datetime.datetime.now().strftime('%y%m%d-%H%M%S'),
'project': context.Context.default().project_id,
'extra_packages': ['gs://cloud-datalab/dataflow/datalab.tar.gz', ml.sdk_location],
'teardown_policy': 'TEARDOWN_ALWAYS',
'no_save_main_session': True
}
return options

def run_cloud(self, eval_source, eval_results, features_to_slice, metrics, output_file,
pipeline_option=None):
"""Run the pipeline in cloud. Returns when the job is submitted.
Calling of this function may incur some cost since it runs a DataFlow job in Google Cloud.
If pipeline_option is not specified, make sure you are signed in (through Datalab)
and a default project is set so it can get credentials and projects from global context.
Args:
eval_source: The only supported format is CsvEvalResults now while we may add more.
The source needs to be a GCS path and is readable to current signed in user.
eval_results: The only supported format is CsvEvalSource now while we may add more.
The source needs to be a GCS path and is readable to current signed in user.
features_to_slice: A list of features to slice on. The features must exist in
eval_source, and can be numeric, categorical, or target.
metrics: A list of metrics to compute. For classification, it supports "accuracy",
"logloss". For regression, it supports "RMSE".
pipeline_option: If not specified, use default options. Recommend customizing your options
based on default one obtained from default_pipeline_options(). For example,
options = fsp.default_pipeline_options()
options['num_workers'] = 10
...
output_file: A GCS file prefix holding the aggregated results.
"""
import os
if pipeline_option is None:
output_dir = os.path.dirname(output_file)
pipeline_option = self.default_pipeline_options(output_dir)
opts = beam.pipeline.PipelineOptions(flags=[], **pipeline_option)
p = beam.Pipeline('DataflowPipelineRunner', options=opts)
self._pipeline_def(p, eval_source, eval_results, features_to_slice, metrics, output_file)
p.run()

Loaded pandas dataframe.
"""
with ml.util._file.open_local_or_gcs(schema_path, mode='r') as f:
schema = yaml.safe_load(f)
_MAPPINGS = {
'FLOAT': np.float64,
'INTEGER': np.int64,
'TIMESTAMP': np.datetime64,
'BOOLEAN': np.bool,
}
for item in schema:
item['type'] = _MAPPINGS.get(item['type'], object)
names = [x['name'] for x in schema]
dtype = {x['name']: x['type'] for x in schema}
with ml.util._file.open_local_or_gcs(csv_path, mode='r') as f:
return pd.read_csv(f, names=names, dtype=dtype)
86 changes: 86 additions & 0 deletions datalab/mlalpha/commands/_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@
raise Exception('This module can only be loaded in ipython.')

import collections
import google.cloud.ml as cloudml
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
from sklearn.metrics import confusion_matrix
import yaml


import datalab.context
import datalab.mlalpha
import datalab.utils.commands
Expand Down Expand Up @@ -87,6 +93,23 @@ def ml(line, cell=None):
required=True)
batch_predict_parser.set_defaults(func=_batch_predict)

confusion_matrix_parser = parser.subcommand('confusion_matrix',
'Plot confusion matrix. The source is provided ' +
'in one of "csv", "bqtable", and "sql" params.')
confusion_matrix_parser.add_argument('--csv',
help='GCS or local path of CSV file which contains ' +
'"target", "predicted" columns at least. The CSV ' +
'either comes with a schema file in the same dir, ' +
'or specify "headers: name1, name2..." in cell.')
confusion_matrix_parser.add_argument('--bqtable',
help='name of the BigQuery table in the form of ' +
'dataset.table.')
confusion_matrix_parser.add_argument('--sql',
help='name of the sql module defined in previous cell ' +
'which should return "target", "predicted", ' +
'and "count" columns at least in results.')
confusion_matrix_parser.set_defaults(func=_confusion_matrix)

namespace = datalab.utils.commands.notebook_environment()
return datalab.utils.commands.handle_magic_line(line, cell, parser, namespace=namespace)

Expand Down Expand Up @@ -158,3 +181,66 @@ def _predict(args, cell):

def _batch_predict(args, cell):
return _run_package(args, cell, 'batch_predict')


def _plot_confusion_matrix(cm, labels):
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion matrix')
plt.colorbar()
tick_marks = np.arange(len(labels))
plt.xticks(tick_marks, labels, rotation=45)
plt.yticks(tick_marks, labels)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')


def _confusion_matrix_from_csv(input_csv, cell):
schema_file = input_csv + '.schema.yaml'
headers = None
if cell is not None:
env = datalab.utils.commands.notebook_environment()
config = datalab.utils.commands.parse_config(cell, env)
headers_str = config.get('headers', None)
if headers_str is not None:
headers = [x.strip() for x in headers_str.split(',')]
if headers is not None:
with cloudml.util._file.open_local_or_gcs(input_csv, mode='r') as f:
df = pd.read_csv(f, names=headers)
elif cloudml.util._file.file_exists(schema_file):
df = datalab.mlalpha.csv_to_dataframe(input_csv, schema_file)
else:
raise Exception('headers is missing from cell, ' +
'and there is no schema file in the same dir as csv')
labels = sorted(set(df['target']) | set(df['predicted']))
cm = confusion_matrix(df['target'], df['predicted'], labels=labels)
return cm, labels


def _confusion_matrix_from_query(sql_module_name, bq_table):
if sql_module_name is not None:
item = datalab.utils.commands.get_notebook_item(sql_module_name)
query, _ = datalab.data.SqlModule.get_sql_statement_with_environment(item, {})
else:
query = ('select target, predicted, count(*) as count from %s group by target, predicted'
% bq_table)
dfbq = datalab.bigquery.Query(query).results().to_dataframe()
labels = sorted(set(dfbq['target']) | set(dfbq['predicted']))
labels_count = len(labels)
dfbq['target'] = [labels.index(x) for x in dfbq['target']]
dfbq['predicted'] = [labels.index(x) for x in dfbq['predicted']]
cm = [[0]*labels_count for i in range(labels_count)]
for index, row in dfbq.iterrows():
cm[row['target']][row['predicted']] = row['count']
return cm, labels


def _confusion_matrix(args, cell):
if args['csv'] is not None:
#TODO: Maybe add cloud run for large CSVs with federated table.
cm, labels = _confusion_matrix_from_csv(args['csv'], cell)
elif args['sql'] is not None or args['bqtable'] is not None:
cm, labels = _confusion_matrix_from_query(args['sql'], args['bqtable'])
else:
raise Exception('One of "csv", "bqtable", and "sql" param is needed.')
_plot_confusion_matrix(cm, labels)
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ class Evaluator(object):
"""Loads variables from latest checkpoint and performs model evaluation."""

def __init__(self, model, data_paths, batch_size, output_path, dataset='eval'):
self.num_eval_batches = self._data_size(data_paths) // batch_size
data_size = self._data_size(data_paths)
if data_size <= batch_size:
raise Exception('Data size is smaller than batch size.')
self.num_eval_batches = data_size // batch_size
self.batch_of_examples = []
self.checkpoint_path = os.path.join(output_path, 'train')
self.output_path = os.path.join(output_path, dataset)
Expand Down

0 comments on commit 0492e6f

Please sign in to comment.