Skip to content

Commit b15a481

Browse files
Various fixes (ray-project#10)
1 parent 960fb87 commit b15a481

File tree

2 files changed

+118
-22
lines changed

2 files changed

+118
-22
lines changed

python/ray/dataframe/dataframe.py

Lines changed: 110 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@
99
from pandas._libs import lib
1010
from pandas.core.dtypes.cast import maybe_upcast_putmask
1111
from pandas.compat import lzip
12+
import pandas.core.common as com
1213
from pandas.core.dtypes.common import (
1314
is_bool_dtype,
1415
is_numeric_dtype,
1516
is_timedelta64_dtype)
17+
from pandas.core.indexing import convert_to_index_sliceable
1618
import warnings
1719
import numpy as np
1820
import ray
@@ -3167,12 +3169,72 @@ def __getitem__(self, key):
31673169
Returns:
31683170
A Pandas Series representing the value for the column.
31693171
"""
3170-
partition_id = self.get_col_partition(key)
3171-
index = self.get_col_index_within_partition(key)
3172-
res = ray.get(_deploy_func.remote(lambda df: df.__getitem__(index),
3173-
self._col_partitions[partition_id]))
3174-
res.name = key
3175-
return res
3172+
key = com._apply_if_callable(key, self)
3173+
3174+
# shortcut if we are an actual column
3175+
is_mi_columns = isinstance(self.columns, pd.MultiIndex)
3176+
try:
3177+
if key in self.columns and not is_mi_columns:
3178+
return self._getitem_column(key)
3179+
except:
3180+
pass
3181+
3182+
# see if we can slice the rows
3183+
indexer = convert_to_index_sliceable(self._row_index, key)
3184+
if indexer is not None:
3185+
raise NotImplementedError("To contribute to Pandas on Ray, please"
3186+
"visit github.com/ray-project/ray.")
3187+
# return self._getitem_slice(indexer)
3188+
3189+
if isinstance(key, (pd.Series, np.ndarray, pd.Index, list)):
3190+
return self._getitem_array(key)
3191+
elif isinstance(key, DataFrame):
3192+
raise NotImplementedError("To contribute to Pandas on Ray, please"
3193+
"visit github.com/ray-project/ray.")
3194+
# return self._getitem_frame(key)
3195+
elif is_mi_columns:
3196+
raise NotImplementedError("To contribute to Pandas on Ray, please"
3197+
"visit github.com/ray-project/ray.")
3198+
# return self._getitem_multilevel(key)
3199+
else:
3200+
return self._getitem_column(key)
3201+
3202+
def _getitem_column(self, key):
3203+
partition = self._get_col_locations(key).loc['partition']
3204+
result = ray.get(self._getitem_indiv_col(key, partition))
3205+
result.name = key
3206+
result.index = self.index
3207+
return result
3208+
3209+
def _getitem_array(self, array_key):
3210+
partitions = \
3211+
self._get_col_locations(array_key)['partition'].unique()
3212+
3213+
new_col_parts = [self._getitem_indiv_col(array_key, part)
3214+
for part in partitions]
3215+
3216+
# Pandas doesn't allow Index.get_loc for lists, so we have to do this.
3217+
isin = self.columns.isin(array_key)
3218+
indices_for_rows = [i for i in range(len(isin)) if isin[i]]
3219+
3220+
new_row_parts = [_deploy_func.remote(
3221+
lambda df: df.__getitem__(indices_for_rows),
3222+
part) for part in self._row_partitions]
3223+
3224+
return DataFrame(col_partitions=new_col_parts,
3225+
row_partitions=new_row_parts,
3226+
columns=array_key,
3227+
index=self.index)
3228+
3229+
def _getitem_indiv_col(self, key, partition):
3230+
loc = self._col_index.loc[key]
3231+
if isinstance(loc, pd.Series):
3232+
index = loc[loc['partition'] == partition]
3233+
else:
3234+
index = loc[loc['partition'] == partition]['index_within_partition']
3235+
return _deploy_func.remote(
3236+
lambda df: df.__getitem__(index),
3237+
self._col_partitions[partition])
31763238

31773239
def __setitem__(self, key, value):
31783240
raise NotImplementedError(
@@ -3274,9 +3336,7 @@ def __delitem__(self, key):
32743336
key: key to delete
32753337
"""
32763338
# Create helper method for deleting column(s) in row partition.
3277-
to_delete = self.columns.get_loc(key)
3278-
3279-
def del_helper(df):
3339+
def del_helper(df, to_delete):
32803340
cols = df.columns[to_delete] # either int or an array of ints
32813341

32823342
if isinstance(cols, int):
@@ -3285,23 +3345,41 @@ def del_helper(df):
32853345
for col in cols:
32863346
df.__delitem__(col)
32873347

3348+
# Reset the column index to conserve space
32883349
df.columns = pd.RangeIndex(0, len(df.columns))
32893350
return df
32903351

3352+
to_delete = self.columns.get_loc(key)
32913353
self._row_partitions = _map_partitions(
3292-
del_helper, self._row_partitions)
3354+
del_helper, self._row_partitions, to_delete)
3355+
3356+
# This structure is used to get the correct index inside the partition.
3357+
del_df = self._col_index.loc[key]
3358+
3359+
# We need to standardize between multiple and single occurrences in the
3360+
# columns. Putting single occurrences in a pd.DataFrame and transposing
3361+
# results in the same structure as multiple with 'loc'.
3362+
if isinstance(del_df, pd.Series):
3363+
del_df = pd.DataFrame(del_df).T
32933364

32943365
# Cast cols as pd.Series as duplicate columns mean result may be
32953366
# np.int64 or pd.Series
32963367
col_parts_to_del = pd.Series(
32973368
self._col_index.loc[key, 'partition']).unique()
3298-
self._col_index = self._col_index.drop(key)
3369+
self._col_index.drop(key, inplace=True)
32993370
for i in col_parts_to_del:
3371+
# Compute the correct index inside the partition to delete.
3372+
to_delete_in_partition = \
3373+
del_df[del_df['partition'] == i]['index_within_partition']
3374+
33003375
self._col_partitions[i] = _deploy_func.remote(
3301-
del_helper, self._col_partitions[i])
3376+
del_helper, self._col_partitions[i], to_delete_in_partition)
33023377

33033378
partition_mask = (self._col_index['partition'] == i)
33043379

3380+
# Since we are replacing columns with RangeIndex inside the
3381+
# partition, we have to make sure that our reference to it is
3382+
# updated as well.
33053383
try:
33063384
self._col_index.loc[partition_mask,
33073385
'index_within_partition'] = [
@@ -3551,14 +3629,26 @@ def iloc(self):
35513629
from .indexing import _iLoc_Indexer
35523630
return _iLoc_Indexer(self)
35533631

3554-
def get_col_partition(self, col):
3555-
return self._col_index['partition'][col]
3632+
def _get_col_locations(self, col):
3633+
"""Gets the location(s) from the column index DataFrame.
35563634
3557-
def get_col_index_within_partition(self, col):
3558-
return self._col_index['index_within_partition'][col]
3635+
Args:
3636+
col: The column name.
35593637
3560-
def get_row_partition(self, row):
3561-
return self._row_index['partition'][row]
3638+
Returns:
3639+
The index(es) of _col_partitions and the local index(es) where
3640+
columns with this name exist.
3641+
"""
3642+
return self._col_index.loc[col]
3643+
3644+
def _get_row_locations(self, row):
3645+
"""Gets the location(s) from the row index DataFrame.
3646+
3647+
Args:
3648+
row: The index name.
35623649
3563-
def get_row_index_within_partition(self, row):
3564-
return self._row_index['index_within_partition'][row]
3650+
Returns:
3651+
The index(es) of _row_partitions and the local index(es) where rows
3652+
with this name exist.
3653+
"""
3654+
return self._row_index.loc[row]

python/ray/dataframe/utils.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,11 @@ def _rebuild_rows(col_partitions, index, columns):
226226
"""
227227
n_rows = min(max(get_npartitions(), len(col_partitions)), len(index))
228228
partition_assignments = assign_partitions.remote(index, n_rows)
229-
shufflers = [ShuffleActor.remote(x, partition_axis=1, shuffle_axis=0)
230-
for x in col_partitions]
229+
shufflers = [ShuffleActor.remote(
230+
col_partitions[i] if i < len(col_partitions) else pd.DataFrame(),
231+
partition_axis=1,
232+
shuffle_axis=0)
233+
for i in range(n_rows)]
231234

232235
shufflers_done = \
233236
[shufflers[i].shuffle.remote(
@@ -291,6 +294,9 @@ def _map_partitions(func, partitions, *argslists):
291294
assert(callable(func))
292295
if argslists is None:
293296
return [_deploy_func.remote(func, part) for part in partitions]
297+
elif len(argslists) == 1:
298+
return [_deploy_func.remote(func, part, argslists[0])
299+
for part in partitions]
294300
else:
295301
assert(all([len(args) == len(partitions) for args in argslists]))
296302
return [_deploy_func.remote(func, part, *args) for part, *args in zip(partitions, *argslists)]

0 commit comments

Comments
 (0)