Skip to content

Commit

Permalink
FEAT-modin-project#3451: Support __partitioned__ protocol
Browse files Browse the repository at this point in the history
Signed-off-by: Igoshev, Yaroslav <yaroslav.igoshev@intel.com>
  • Loading branch information
YarShev committed Sep 21, 2021
1 parent 7727c23 commit 1caf113
Showing 1 changed file with 61 additions and 1 deletion.
62 changes: 61 additions & 1 deletion modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import warnings
import pickle as pkl

from modin.utils import try_cast_to_pandas, _inherit_docstrings
from modin.utils import try_cast_to_pandas, _inherit_docstrings, get_current_backend
from modin.error_message import ErrorMessage
from modin.pandas.utils import is_scalar
from modin.config import IsExperimental
Expand Down Expand Up @@ -3010,6 +3010,66 @@ def __xor__(self, other):
def __rxor__(self, other):
return self._binary_op("__rxor__", other, axis=0)

@property
def __partitioned__(self):
"""Implementation of github.com/IntelPython/DPPY-Spec/issues/3."""
from .dataframe import DataFrame

is_dataframe = isinstance(self, DataFrame)
backend = get_current_backend()

if backend in ("PandasOnRay",):
from modin.engines.ray.task_wrapper import RayTask

get = RayTask.materialize
part_attr = "oid"
elif backend in ("PandasOnDask",):
from modin.engines.dask.task_wrapper import DaskTask

get = DaskTask.materialize
part_attr = "future"
else:
raise NotImplementedError(
f"'__partitioned__' is not supported by '{backend}' backend."
)

parts = self._query_compiler._modin_frame._partitions
n_rparts = len(parts)
n_cparts = len(parts[0])
# Now compute partition info, including global start and shape of each
partitions = {}
curr_x = 0
for i in range(n_rparts):
curr_y = 0
for j in range(n_cparts):
part = parts[i][j]
curr_part = getattr(part, part_attr)
curr_ip = part._ip_cache
curr_shape = (
(part.length(), part.width()) if is_dataframe else (part.length(),)
)
partitions[(i, j)] = {
"start": (curr_x, curr_y),
"shape": curr_shape,
"data": curr_part,
"location": [
part.ip()
if curr_ip is None
else curr_ip
if isinstance(curr_ip, str)
else part.ip()
],
}
curr_y += curr_shape[1] if is_dataframe else 0 # in inner loop
curr_x += curr_shape[0] # in outer loop

return {
"partition_tiling": (n_rparts, n_cparts) if is_dataframe else (n_rparts,),
"partitions": partitions,
"get": lambda x: get(x)[0]
# we don't set 'locals' because this is controller-worker, not SPMD
}

@property
def size(self):
return len(self._query_compiler.index) * len(self._query_compiler.columns)
Expand Down

0 comments on commit 1caf113

Please sign in to comment.