Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REFACTOR-#5424: Replace dtypes="copy" with copy_dtypes flag #5426

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions modin/core/dataframe/algebra/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ def register(cls, func, join_type="outer", labels="replace"):
"""

def caller(
query_compiler, other, broadcast=False, *args, dtypes=None, **kwargs
query_compiler,
other,
broadcast=False,
*args,
dtypes=None,
copy_dtypes=False,
**kwargs
):
"""
Apply binary `func` to passed operands.
Expand All @@ -61,8 +67,14 @@ def caller(
at the query compiler level, so this parameter is a hint that passed from a high level API.
*args : args,
Arguments that will be passed to `func`.
dtypes : "copy" or None, default: None
Whether to keep old dtypes or infer new dtypes from data.
dtypes : pandas.Series or scalar type, optional
The data types for the result. This is an optimization
because there are functions that always result in a particular data
type, and this allows us to avoid (re)computing it.
If the argument is a scalar type, then that type is assigned to each result column.
copy_dtypes : bool, default False
If True, the dtypes of the resulting dataframe are copied from the original,
and the ``dtypes`` argument is ignored.
**kwargs : kwargs,
Arguments that will be passed to `func`.

Expand Down Expand Up @@ -93,6 +105,7 @@ def caller(
join_type=join_type,
labels=labels,
dtypes=dtypes,
copy_dtypes=copy_dtypes,
)
)
else:
Expand All @@ -113,11 +126,13 @@ def caller(
new_index=query_compiler.index,
new_columns=query_compiler.columns,
dtypes=dtypes,
copy_dtypes=copy_dtypes,
)
else:
new_modin_frame = query_compiler._modin_frame.map(
lambda df: func(df, other, *args, **kwargs),
dtypes=dtypes,
copy_dtypes=copy_dtypes,
)
return query_compiler.__constructor__(new_modin_frame)

Expand Down
20 changes: 14 additions & 6 deletions modin/core/dataframe/base/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

from abc import ABC, abstractmethod
from typing import List, Hashable, Optional, Callable, Union, Dict

import pandas

from modin.core.dataframe.base.dataframe.utils import Axis, JoinType


Expand Down Expand Up @@ -92,7 +95,8 @@ def map(
self,
function: Callable,
axis: Optional[Union[int, Axis]] = None,
dtypes: Optional[str] = None,
dtypes: Optional[Union[pandas.Series, type]] = None,
copy_dtypes: bool = False,
) -> "ModinDataframe":
"""
Apply a user-defined function row-wise if `axis`=0, column-wise if `axis`=1, and cell-wise if `axis` is None.
Expand All @@ -103,10 +107,14 @@ def map(
The function to map across the dataframe.
axis : int or modin.core.dataframe.base.utils.Axis, optional
The axis to map over.
dtypes : str, optional
dtypes : pandas.Series or scalar type, optional
The data types for the result. This is an optimization
because there are functions that always result in a particular data
type, and this allows us to avoid (re)computing it.
noloerino marked this conversation as resolved.
Show resolved Hide resolved
If the argument is a scalar type, then that type is assigned to each result column.
copy_dtypes : bool, default: False
If True, the dtypes of the resulting dataframe are copied from the original,
and the ``dtypes`` argument is ignored.

Returns
-------
Expand Down Expand Up @@ -258,7 +266,7 @@ def reduce(
self,
axis: Union[int, Axis],
function: Callable,
dtypes: Optional[str] = None,
dtypes: Optional[pandas.Series] = None,
) -> "ModinDataframe":
"""
Perform a user-defined aggregation on the specified axis, where the axis reduces down to a singleton.
Expand All @@ -269,7 +277,7 @@ def reduce(
The axis to perform the reduce over.
function : callable(row|col) -> single value
The reduce function to apply to each column.
dtypes : str, optional
dtypes : pandas.Series, optional
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is scalar type not accepted here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scalar types are not accepted here. The control flow here is reduce -> _compute_tree_reduce_metadata -> dataframe constructor, which directly sets self.dtypes on the new dataframe. In contrast, map is written to duplicate scalar datatypes into a series here.

The data types for the result. This is an optimization
because there are functions that always result in a particular data
type, and this allows us to avoid (re)computing it.
Expand All @@ -291,7 +299,7 @@ def tree_reduce(
axis: Union[int, Axis],
map_func: Callable,
reduce_func: Optional[Callable] = None,
dtypes: Optional[str] = None,
dtypes: Optional[pandas.Series] = None,
) -> "ModinDataframe":
"""
Perform a user-defined aggregation on the specified axis, where the axis reduces down to a singleton using a tree-reduce computation pattern.
Expand All @@ -308,7 +316,7 @@ def tree_reduce(
The map function to apply to each column.
reduce_func : callable(row|col) -> single value, optional
The reduce function to apply to the results of the map function.
dtypes : str, optional
dtypes : pandas.Series, optional
noloerino marked this conversation as resolved.
Show resolved Hide resolved
The data types for the result. This is an optimization
because there are functions that always result in a particular data
type, and this allows us to avoid (re)computing it.
Expand Down
66 changes: 46 additions & 20 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1599,7 +1599,7 @@ def _tree_reduce_func(df, *args, **kwargs):

return _tree_reduce_func

def _compute_tree_reduce_metadata(self, axis, new_parts):
def _compute_tree_reduce_metadata(self, axis, new_parts, dtypes=None):
"""
Compute the metadata for the result of reduce function.

Expand All @@ -1609,6 +1609,8 @@ def _compute_tree_reduce_metadata(self, axis, new_parts):
The axis on which reduce function was applied.
new_parts : NumPy 2D array
Partitions with the result of applied function.
dtypes : pandas.Series, optional
noloerino marked this conversation as resolved.
Show resolved Hide resolved
The data types of the result.

Returns
-------
Expand All @@ -1623,12 +1625,11 @@ def _compute_tree_reduce_metadata(self, axis, new_parts):
new_axes_lengths[axis] = [1]
new_axes_lengths[axis ^ 1] = self._axes_lengths[axis ^ 1]

new_dtypes = None
result = self.__constructor__(
new_parts,
*new_axes,
*new_axes_lengths,
new_dtypes,
dtypes,
)
return result

Expand All @@ -1637,7 +1638,7 @@ def reduce(
self,
axis: Union[int, Axis],
function: Callable,
dtypes: Optional[str] = None,
dtypes: Optional = None,
) -> "PandasDataframe":
"""
Perform a user-defined aggregation on the specified axis, where the axis reduces down to a singleton. Requires knowledge of the full axis for the reduction.
Expand All @@ -1648,7 +1649,7 @@ def reduce(
The axis to perform the reduce over.
function : callable(row|col) -> single value
The reduce function to apply to each column.
dtypes : str, optional
dtypes : pandas.Series, optional
noloerino marked this conversation as resolved.
Show resolved Hide resolved
The data types for the result. This is an optimization
because there are functions that always result in a particular data
type, and this allows us to avoid (re)computing it.
Expand All @@ -1667,15 +1668,15 @@ def reduce(
new_parts = self._partition_mgr_cls.map_axis_partitions(
axis.value, self._partitions, function
)
return self._compute_tree_reduce_metadata(axis.value, new_parts)
return self._compute_tree_reduce_metadata(axis.value, new_parts, dtypes)

@lazy_metadata_decorator(apply_axis="opposite", axis_arg=0)
def tree_reduce(
self,
axis: Union[int, Axis],
map_func: Callable,
reduce_func: Optional[Callable] = None,
dtypes: Optional[str] = None,
dtypes: Optional = None,
) -> "PandasDataframe":
"""
Apply function that will reduce the data to a pandas Series.
Expand All @@ -1689,7 +1690,7 @@ def tree_reduce(
reduce_func : callable(row|col) -> single value, optional
Callable function to reduce the dataframe.
If none, then apply map_func twice.
dtypes : str, optional
dtypes : pandas.Series, optional
noloerino marked this conversation as resolved.
Show resolved Hide resolved
The data types for the result. This is an optimization
because there are functions that always result in a particular data
type, and this allows us to avoid (re)computing it.
Expand All @@ -1710,29 +1711,35 @@ def tree_reduce(
reduce_parts = self._partition_mgr_cls.map_axis_partitions(
axis.value, map_parts, reduce_func
)
return self._compute_tree_reduce_metadata(axis.value, reduce_parts)
return self._compute_tree_reduce_metadata(axis.value, reduce_parts, dtypes)

@lazy_metadata_decorator(apply_axis=None)
def map(self, func: Callable, dtypes: Optional[str] = None) -> "PandasDataframe":
def map(
self, func: Callable, dtypes: Optional = None, copy_dtypes: bool = False
) -> "PandasDataframe":
"""
Perform a function that maps across the entire dataset.

Parameters
----------
func : callable(row|col|cell) -> row|col|cell
The function to apply.
dtypes : dtypes of the result, optional
dtypes : pandas.Series or scalar type, optional
The data types for the result. This is an optimization
because there are functions that always result in a particular data
type, and this allows us to avoid (re)computing it.
If the argument is a scalar type, then that type is assigned to each result column.
copy_dtypes : bool, default: False
If True, the dtypes of the resulting dataframe are copied from the original,
and the ``dtypes`` argument is ignored.

Returns
-------
PandasDataframe
A new dataframe.
"""
new_partitions = self._partition_mgr_cls.map_partitions(self._partitions, func)
if dtypes == "copy":
if copy_dtypes:
dtypes = self._dtypes
elif dtypes is not None:
dtypes = pandas.Series(
Expand Down Expand Up @@ -2186,6 +2193,7 @@ def apply_full_axis(
new_index=None,
new_columns=None,
dtypes=None,
copy_dtypes=False,
keep_partitioning=True,
):
"""
Expand All @@ -2203,10 +2211,13 @@ def apply_full_axis(
new_columns : list-like, optional
The columns of the result. We may know this in
advance, and if not provided it must be computed.
dtypes : list-like, optional
dtypes : pandas.Series, optional
The data types of the result. This is an optimization
because there are functions that always result in a particular data
type, and allows us to avoid (re)computing it.
copy_dtypes : bool, default: False
If True, the dtypes of the resulting dataframe are copied from the original,
and the ``dtypes`` argument is ignored.
keep_partitioning : boolean, default: True
The flag to keep partition boundaries for Modin Frame.
Setting it to True disables shuffling data from one partition to another.
Expand All @@ -2226,6 +2237,7 @@ def apply_full_axis(
new_index=new_index,
new_columns=new_columns,
dtypes=dtypes,
copy_dtypes=copy_dtypes,
other=None,
keep_partitioning=keep_partitioning,
)
Expand Down Expand Up @@ -2401,7 +2413,14 @@ def apply_select_indices(

@lazy_metadata_decorator(apply_axis="both")
def broadcast_apply(
self, axis, func, other, join_type="left", labels="keep", dtypes=None
self,
axis,
func,
other,
join_type="left",
labels="keep",
dtypes=None,
copy_dtypes=False,
):
"""
Broadcast axis partitions of `other` to partitions of `self` and apply a function.
Expand All @@ -2419,8 +2438,12 @@ def broadcast_apply(
labels : {"keep", "replace", "drop"}, default: "keep"
Whether keep labels from `self` Modin DataFrame, replace them with labels
from joined DataFrame or drop altogether to make them be computed lazily later.
dtypes : "copy" or None, default: None
Whether keep old dtypes or infer new dtypes from data.
dtypes : pandas.Series, optional
The data types of the result. This is an optimization
because there are functions that always result in a particular data
type, and allows us to avoid (re)computing it.
copy_dtypes : bool, default: False
If True, the dtypes of the resulting dataframe are copied from the original.

Returns
-------
Expand All @@ -2441,8 +2464,7 @@ def broadcast_apply(
new_frame = self._partition_mgr_cls.broadcast_apply(
axis, func, left_parts, right_parts
)
if dtypes == "copy":
dtypes = self._dtypes
dtypes = self._dtypes if copy_dtypes else None

def _pick_axis(get_axis, sizes_cache):
if labels == "keep":
Expand Down Expand Up @@ -2618,6 +2640,7 @@ def broadcast_apply_full_axis(
apply_indices=None,
enumerate_partitions=False,
dtypes=None,
copy_dtypes=False,
keep_partitioning=True,
):
"""
Expand All @@ -2642,10 +2665,13 @@ def broadcast_apply_full_axis(
enumerate_partitions : bool, default: False
Whether pass partition index into applied `func` or not.
Note that `func` must be able to obtain `partition_idx` kwarg.
dtypes : list-like, default: None
dtypes : pandas.Series, optional
Data types of the result. This is an optimization
because there are functions that always result in a particular data
type, and allows us to avoid (re)computing it.
copy_dtypes : bool, default: False
If True, the dtypes of the resulting dataframe are copied from the original,
and the ``dtypes`` argument is ignored.
keep_partitioning : boolean, default: True
The flag to keep partition boundaries for Modin Frame.
Setting it to True disables shuffling data from one partition to another.
Expand Down Expand Up @@ -2677,7 +2703,7 @@ def broadcast_apply_full_axis(
)
# Index objects for new object creation. This is shorter than if..else
kw = self.__make_init_labels_args(new_partitions, new_index, new_columns)
if dtypes == "copy":
if copy_dtypes:
kw["dtypes"] = self._dtypes
elif dtypes is not None:
kw["dtypes"] = pandas.Series(
Expand Down
Loading