Skip to content
This repository has been archived by the owner on Dec 4, 2019. It is now read-only.

[WIP] Converts dataframe to/from named numpy arrays #4

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions python/pdspark/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pyspark.ml.regression import LinearRegressionModel
from pyspark.mllib.linalg import DenseVector, SparseVector, Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql import SQLContext

from udt import CSRVectorUDT
from util import _new_java_obj, _randomUID
Expand All @@ -36,6 +37,7 @@ def __init__(self, sc):
:param sc: SparkContext
"""
self.sc = sc
self.sqlContext = SQLContext(self.sc)
# For conversions sklearn -> Spark
self._skl2spark_classes = {
SKL_LogisticRegression :
Expand Down Expand Up @@ -161,3 +163,84 @@ def toScipy(self, X):
else:
raise TypeError("Converter.toScipy expected numpy.ndarray of"
" scipy.sparse.csr.csr_matrix instances, but found: %s" % type(X))

@staticmethod
def _analyze_element(x):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this will be very slow for larger data? That's OK for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it will; we can always improve it later.

if type(x) is float:
return (x, np.double)
if type(x) is int:
return (x, np.int)
if type(x) is long:
return (x, np.long)
if type(x) is DenseVector:
return (x.toArray(), (np.double, len(x.toArray())))
# TODO(tjh) support sparse arrays
raise ValueError("The type %s could not be understood. Element was %s" % (type(x), x))

@staticmethod
def _analyze_df(df):
""" Converts a dataframe into a numpy array.
"""
rows = df.collect()
conversions = [[Converter._analyze_element(x) for x in row] for row in rows]
types = [t for d, t in conversions[0]]
data = [tuple([d for d, t in labeled_elts]) for labeled_elts in conversions]
names = list(df.columns)
dt = np.dtype({'names': names, 'formats': types})
arr = np.array(data, dtype=dt)
return arr

def pack_DataFrame(self, **kwargs):
""" Converts a set of numpy arrays into a single dataframe.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs should list the supported input types and how they are handled: lists of common types, or lists of vector or numerical array types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I also added that we support a subset of numpy types (there are so many) and sql types.


The argument name is used to infer the name of the column. The columns may not be in
the order they are provided. Each column needs to have the same number of elements.

Example:

>>> X = np.zeros((10,4))
>>> y = np.ones(10)
>>> df = conv.pack_DataFrame(x = X, y = y)
>>> df.printSchema()
root
|-- y: double (nullable = true)
|-- x: vector (nullable = true)
"""
def convert(z):
if len(z.shape) == 1:
return z.tolist()
if len(z.shape) == 2:
return [Vectors.dense(row) for row in z.tolist()]
assert False, (z.shape)
pairs = [(name, convert(data)) for (name, data) in kwargs.items()]
vecs = zip(*[data for (_, data) in pairs])
names = [name for (name, _) in pairs]
return self.sqlContext.createDataFrame(vecs, names)

@staticmethod
def df_to_numpy(df, *args):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have this and pack_DataFrame use clearly matching names? E.g., df_to_numpy and numpy_to_df. Or pack_DataFrame and unpack_DataFrame?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, good point. I am going to follow numpy_to_df because packing and unpacking sounds a bit less intuitive.

""" Converts a dataframe into a (local) numpy array. Each column is named after the same
column name in the data frame.

The varargs provide (in order) the list of columns to extract from the dataframe.
If none are provided, all the columns from the dataframe are extracted.

This method only handles basic numerical types, or dense vectors with the same length.

Note: it is not particularly optimized, do not push it too hard.

Example:
>>> z = conv.df_to_numpy(df)
>>> z['x'].dtype, z['x'].shape
>>> z = conv.df_to_numpy(df, 'y')
>>> z['y'].dtype, z['y'].shape
"""
column_names = df.columns
if not args:
args = column_names
column_nameset = set(column_names)
for name in args:
assert name in column_nameset, (name, column_names)
# Just get the interesting columns
projected = df.select(*args)
return Converter._analyze_df(projected)
50 changes: 50 additions & 0 deletions python/pdspark/converter_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import numpy as np
import numpy.random as rd
import unittest

from pdspark import Converter
from pdspark.test_common import get_context

sc = get_context()

n = 5
A = rd.rand(n,4)
B = rd.rand(n)
C = rd.randint(10, size=n)

class MLlibTestCase(unittest.TestCase):

def setUp(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call super setUp

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

super(MLlibTestCase, self).setUp()
self.sc = sc
self.conv = Converter(self.sc)

def test_pack(self):
df = self.conv.pack_DataFrame(a=A, b=B, c=C)
dt = dict(df.dtypes)
assert dt == {'a':'vector', 'b': 'double', 'c': 'bigint'}, dt
z = df.collect()
assert len(z) == n
for row in z:
assert len(row) == 3, row
assert row['a'] is not None, row
assert row['b'] is not None, row
assert row['c'] is not None, row

def test_unpack(self):
df = self.conv.pack_DataFrame(a=A, b=B, c=C)
Z = Converter.df_to_numpy(df)

assert np.all(Z['a'] == A), (Z['a'], A)
assert np.all(Z['b'] == B), (Z['b'], B)
assert np.all(Z['c'] == C), (Z['c'], C)
assert Z['c'].dtype == C.dtype

def test_unpack_select(self):
df = self.conv.pack_DataFrame(a=A, b=B, c=C)
Z = Converter.df_to_numpy(df, 'a', 'c')

assert np.all(Z['a'] == A), (Z['a'], A)
assert np.all(Z['c'] == C), (Z['c'], C)
assert 'b' not in Z.dtype.fields

15 changes: 15 additions & 0 deletions python/pdspark/test_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""
Common variables for all tests.
"""
from pyspark import SparkContext

__all__ = ['get_context']

_sc = None

def get_context():
global _sc
if not _sc:
_sc = SparkContext('local[4]', "spark-sklearn tests")
return _sc

5 changes: 3 additions & 2 deletions python/pdspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from sklearn.grid_search import GridSearchCV as SKL_GridSearchCV
from sklearn.pipeline import Pipeline as SKL_Pipeline

from pyspark import SparkContext

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import RegressionEvaluator
Expand All @@ -35,8 +35,9 @@

from pdspark.converter import Converter
from pdspark.grid_search import GridSearchCV
from .test_common import get_context

sc = SparkContext('local[4]', "spark-sklearn tests")
sc = get_context()

class MLlibTestCase(unittest.TestCase):
def setUp(self):
Expand Down