Skip to content

Commit

Permalink
[DataFrame] Implements filter and dropna (#1959)
Browse files Browse the repository at this point in the history
* implement filter

* begin implementation of dropna

* implement dropna

* docs and tests

* resolving comments

* resolving merge

* add error checking to dropna

* fix update inplace call

* Implement multiple axis for dropna (#13)

* Implement multiple axis for dropna

* Add multiple axis dropna test

* Fix using dummy_frame in dropna

* Clean up dropna multiple axis tests

* remove unnecessary axis modification

* Clean up dropna tests

* resolve comments

* fix lint
  • Loading branch information
kunalgosar authored and devin-petersohn committed May 4, 2018
1 parent 22d4950 commit 4030356
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 11 deletions.
140 changes: 133 additions & 7 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pandas._libs import lib
from pandas.core.dtypes.cast import maybe_upcast_putmask
from pandas import compat
from pandas.compat import lzip, string_types, cPickle as pkl
from pandas.compat import lzip, to_str, string_types, cPickle as pkl
import pandas.core.common as com
from pandas.core.dtypes.common import (
is_bool_dtype,
Expand Down Expand Up @@ -756,7 +756,8 @@ def transpose(self, *args, **kwargs):

T = property(transpose)

def dropna(self, axis, how, thresh=None, subset=[], inplace=False):
def dropna(self, axis=0, how='any', thresh=None, subset=None,
inplace=False):
"""Create a new DataFrame from the removed NA values from this one.
Args:
Expand All @@ -774,7 +775,94 @@ def dropna(self, axis, how, thresh=None, subset=[], inplace=False):
If inplace is set to True, returns None, otherwise returns a new
DataFrame with the dropna applied.
"""
raise NotImplementedError("Not yet")
inplace = validate_bool_kwarg(inplace, "inplace")

if is_list_like(axis):
axis = [pd.DataFrame()._get_axis_number(ax) for ax in axis]

result = self
# TODO(kunalgosar): this builds an intermediate dataframe,
# which does unnecessary computation
for ax in axis:
result = result.dropna(
axis=ax, how=how, thresh=thresh, subset=subset)
if not inplace:
return result

self._update_inplace(block_partitions=result._block_partitions,
columns=result.columns,
index=result.index)

return None

axis = pd.DataFrame()._get_axis_number(axis)

if how is not None and how not in ['any', 'all']:
raise ValueError('invalid how option: %s' % how)
if how is None and thresh is None:
raise TypeError('must specify how or thresh')

if subset is not None:
subset = set(subset)

if axis == 1:
subset = [item for item in self.index if item in subset]
else:
subset = [item for item in self.columns if item in subset]

def dropna_helper(df):
new_df = df.dropna(axis=axis, how=how, thresh=thresh,
subset=subset, inplace=False)

if axis == 1:
new_index = new_df.columns
new_df.columns = pd.RangeIndex(0, len(new_df.columns))
else:
new_index = new_df.index
new_df.reset_index(drop=True, inplace=True)

return new_df, new_index

parts = self._col_partitions if axis == 1 else self._row_partitions
result = [_deploy_func._submit(args=(dropna_helper, df),
num_return_vals=2) for df in parts]
new_parts, new_vals = [list(t) for t in zip(*result)]

if axis == 1:
new_vals = [self._col_metadata.get_global_indices(i, vals)
for i, vals in enumerate(ray.get(new_vals))]

# This flattens the 2d array to 1d
new_vals = [i for j in new_vals for i in j]
new_cols = self.columns[new_vals]

if not inplace:
return DataFrame(col_partitions=new_parts,
columns=new_cols,
index=self.index)

self._update_inplace(col_partitions=new_parts,
columns=new_cols,
index=self.index)

else:
new_vals = [self._row_metadata.get_global_indices(i, vals)
for i, vals in enumerate(ray.get(new_vals))]

# This flattens the 2d array to 1d
new_vals = [i for j in new_vals for i in j]
new_rows = self.index[new_vals]

if not inplace:
return DataFrame(row_partitions=new_parts,
index=new_rows,
columns=self.columns)

self._update_inplace(row_partitions=new_parts,
index=new_rows,
columns=self.columns)

return None

def add(self, other, axis='columns', level=None, fill_value=None):
"""Add this DataFrame to another or a scalar/list.
Expand Down Expand Up @@ -1797,9 +1885,45 @@ def fillna(self, value=None, method=None, axis=None, inplace=False,
return new_obj

def filter(self, items=None, like=None, regex=None, axis=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Subset rows or columns based on their labels
Args:
items (list): list of labels to subset
like (string): retain labels where `arg in label == True`
regex (string): retain labels matching regex input
axis: axis to filter on
Returns:
A new dataframe with the filter applied.
"""
nkw = com._count_not_none(items, like, regex)
if nkw > 1:
raise TypeError('Keyword arguments `items`, `like`, or `regex` '
'are mutually exclusive')
if nkw == 0:
raise TypeError('Must pass either `items`, `like`, or `regex`')

if axis is None:
axis = 'columns' # This is the default info axis for dataframes

axis = pd.DataFrame()._get_axis_number(axis)
labels = self.columns if axis else self.index

if items is not None:
bool_arr = labels.isin(items)
elif like is not None:
def f(x):
return like in to_str(x)
bool_arr = labels.map(f).tolist()
else:
def f(x):
return matcher.search(to_str(x)) is not None
matcher = re.compile(regex)
bool_arr = labels.map(f).tolist()

if not axis:
return self[bool_arr]
return self[self.columns[bool_arr]]

def first(self, offset):
raise NotImplementedError(
Expand Down Expand Up @@ -3990,7 +4114,9 @@ def _getitem_array(self, key):
index=index)
else:
columns = self._col_metadata[key].index
indices_for_rows = [col for col in self.col if col in set(columns)]
indices_for_rows = \
[i for i, item in enumerate(self.columns)
if item in set(columns)]

new_parts = [_deploy_func.remote(
lambda df: df.__getitem__(indices_for_rows),
Expand Down
7 changes: 7 additions & 0 deletions python/ray/dataframe/index_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ def insert(self, key, loc=None, partition=None,
# Return inserted coordinate for callee
return coord_to_insert

def get_global_indices(self, partition, index_within_partition_list):
total = 0
for i in range(partition):
total += self._lengths[i]

return [total + i for i in index_within_partition_list]

def squeeze(self, partition, index_within_partition):
"""Prepare a single coordinate for removal by "squeezing" the
subsequent coordinates "up" one index within that partition. To be used
Expand Down
112 changes: 108 additions & 4 deletions python/ray/dataframe/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ def test_int_dataframe():
'col3',
'col4']

filter_by = {'items': ['col1', 'col5'],
'regex': '4$|3$',
'like': 'col'}

test_filter(ray_df, pandas_df, filter_by)
test_roundtrip(ray_df, pandas_df)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
Expand Down Expand Up @@ -348,6 +353,11 @@ def test_float_dataframe():
'col3',
'col4']

filter_by = {'items': ['col1', 'col5'],
'regex': '4$|3$',
'like': 'col'}

test_filter(ray_df, pandas_df, filter_by)
test_roundtrip(ray_df, pandas_df)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
Expand Down Expand Up @@ -506,6 +516,11 @@ def test_mixed_dtype_dataframe():
'col3',
'col4']

filter_by = {'items': ['col1', 'col5'],
'regex': '4$|3$',
'like': 'col'}

test_filter(ray_df, pandas_df, filter_by)
test_roundtrip(ray_df, pandas_df)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
Expand Down Expand Up @@ -664,6 +679,11 @@ def test_nan_dataframe():
'col3',
'col4']

filter_by = {'items': ['col1', 'col5'],
'regex': '4$|3$',
'like': 'col'}

test_filter(ray_df, pandas_df, filter_by)
test_roundtrip(ray_df, pandas_df)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
Expand Down Expand Up @@ -798,6 +818,23 @@ def test_nan_dataframe():
test_transform(ray_df, pandas_df)


def test_dense_nan_df():
ray_df = rdf.DataFrame([[np.nan, 2, np.nan, 0],
[3, 4, np.nan, 1],
[np.nan, np.nan, np.nan, 5]],
columns=list('ABCD'))

pd_df = pd.DataFrame([[np.nan, 2, np.nan, 0],
[3, 4, np.nan, 1],
[np.nan, np.nan, np.nan, 5]],
columns=list('ABCD'))

test_dropna(ray_df, pd_df)
test_dropna_inplace(ray_df, pd_df)
test_dropna_multiple_axes(ray_df, pd_df)
test_dropna_multiple_axes_inplace(ray_df, pd_df)


@pytest.fixture
def test_inter_df_math(op, simple=False):
ray_df = rdf.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7],
Expand Down Expand Up @@ -1252,6 +1289,68 @@ def test_drop_duplicates():
ray_df.drop_duplicates()


@pytest.fixture
def test_dropna(ray_df, pd_df):
assert ray_df_equals_pandas(ray_df.dropna(axis=1, how='all'),
pd_df.dropna(axis=1, how='all'))

assert ray_df_equals_pandas(ray_df.dropna(axis=1, how='any'),
pd_df.dropna(axis=1, how='any'))

assert ray_df_equals_pandas(ray_df.dropna(axis=0, how='all'),
pd_df.dropna(axis=0, how='all'))

assert ray_df_equals_pandas(ray_df.dropna(thresh=2),
pd_df.dropna(thresh=2))


@pytest.fixture
def test_dropna_inplace(ray_df, pd_df):
ray_df = ray_df.copy()
pd_df = pd_df.copy()

ray_df.dropna(thresh=2, inplace=True)
pd_df.dropna(thresh=2, inplace=True)

assert ray_df_equals_pandas(ray_df, pd_df)

ray_df.dropna(axis=1, how='any', inplace=True)
pd_df.dropna(axis=1, how='any', inplace=True)

assert ray_df_equals_pandas(ray_df, pd_df)


@pytest.fixture
def test_dropna_multiple_axes(ray_df, pd_df):
assert ray_df_equals_pandas(
ray_df.dropna(how='all', axis=[0, 1]),
pd_df.dropna(how='all', axis=[0, 1])
)
assert ray_df_equals_pandas(
ray_df.dropna(how='all', axis=(0, 1)),
pd_df.dropna(how='all', axis=(0, 1))
)


@pytest.fixture
def test_dropna_multiple_axes_inplace(ray_df, pd_df):
ray_df_copy = ray_df.copy()
pd_df_copy = pd_df.copy()

ray_df_copy.dropna(how='all', axis=[0, 1], inplace=True)
pd_df_copy.dropna(how='all', axis=[0, 1], inplace=True)

assert ray_df_equals_pandas(ray_df_copy, pd_df_copy)

ray_df_copy = ray_df.copy()
pd_df_copy = pd_df.copy()

ray_df_copy.dropna(how='all', axis=(0, 1), inplace=True)
pd_df_copy.dropna(how='all', axis=(0, 1), inplace=True)

assert ray_df_equals_pandas(ray_df_copy, pd_df_copy)


def test_duplicated():
ray_df = create_test_dataframe()

Expand Down Expand Up @@ -1747,11 +1846,16 @@ def test_fillna_datetime_columns(num_partitions=2):
"""


def test_filter():
ray_df = create_test_dataframe()
@pytest.fixture
def test_filter(ray_df, pandas_df, by):
ray_df_equals_pandas(ray_df.filter(items=by['items']),
pandas_df.filter(items=by['items']))

with pytest.raises(NotImplementedError):
ray_df.filter()
ray_df_equals_pandas(ray_df.filter(regex=by['regex']),
pandas_df.filter(regex=by['regex']))

ray_df_equals_pandas(ray_df.filter(like=by['like']),
pandas_df.filter(like=by['like']))


def test_first():
Expand Down

0 comments on commit 4030356

Please sign in to comment.