Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: len for groupby #533

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
35a6a8a
fix some minor issues on formatting
RayJi01 Jun 9, 2023
d6b836a
Merge branch 'xprobe-inc:main' into main
RayJi01 Jun 15, 2023
5376759
Merge branch 'xprobe-inc:main' into main
RayJi01 Jun 16, 2023
e7c6224
first try on implement Len Operands
RayJi01 Jun 16, 2023
a7819c5
first try on implement Len Operands2
RayJi01 Jun 16, 2023
9d647f1
Merge branch 'main' into feature/len_for_groupby
mergify[bot] Jun 16, 2023
98f0a08
issues of reducer_index
RayJi01 Jun 21, 2023
f21a683
issues of reducer_index
RayJi01 Jun 21, 2023
6235355
Merge branch 'xprobe-inc:main' into main
RayJi01 Jun 21, 2023
ba93ce7
Fix tile
UranusSeven Jun 26, 2023
5dbbbb0
len method implemented with random test passed
RayJi01 Jun 28, 2023
4f9106f
try to solve chunk_size issues
RayJi01 Jun 29, 2023
59fc701
multiple chunks(chunk_size) implemented with UT and IT passed
RayJi01 Jun 29, 2023
6be18c2
Merge branch 'xprobe-inc:main' into main
RayJi01 Jun 29, 2023
c66a5b9
Merge branch 'main' into feature/len_for_groupby
mergify[bot] Jun 29, 2023
75edbdf
Merge branch 'main' into feature/len_for_groupby
mergify[bot] Jun 30, 2023
5be25cf
Merge branch 'xprobe-inc:main' into main
RayJi01 Jun 30, 2023
a73eb21
Merge branch 'main' into feature/len_for_groupby
mergify[bot] Jun 30, 2023
150b30c
Merge branch 'main' into feature/len_for_groupby
mergify[bot] Jul 1, 2023
2afbd92
Merge branch 'xprobe-inc:main' into main
RayJi01 Jul 3, 2023
d1b77e3
Merge branch 'xprobe-inc:main' into main
RayJi01 Jul 3, 2023
b899fe0
first try on implement Len Operands
RayJi01 Jun 16, 2023
a39c029
first try on implement Len Operands2
RayJi01 Jun 16, 2023
9710c4c
issues of reducer_index
RayJi01 Jun 21, 2023
472d03c
issues of reducer_index
RayJi01 Jun 21, 2023
b0c3f94
Fix tile
UranusSeven Jun 26, 2023
c356e0f
len method implemented with random test passed
RayJi01 Jun 28, 2023
3285a68
try to solve chunk_size issues
RayJi01 Jun 29, 2023
b2a647c
multiple chunks(chunk_size) implemented with UT and IT passed
RayJi01 Jun 29, 2023
04939bf
Merge branch 'main' into feature/len_for_groupby
mergify[bot] Jul 3, 2023
7178676
Merge remote-tracking branch 'origin/feature/len_for_groupby' into fe…
RayJi01 Jul 3, 2023
33da36e
Merge branch 'main' into feature/len_for_groupby
mergify[bot] Jul 4, 2023
1fd2dfd
Merge branch 'main' into feature/len_for_groupby
mergify[bot] Jul 4, 2023
0cf151b
Merge branch 'main' into feature/len_for_groupby
mergify[bot] Jul 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/xorbits/_mars/dataframe/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# noinspection PyUnresolvedReferences
from ..core import DataFrameGroupBy, GroupBy, SeriesGroupBy
from .len import groupby_len


def _install():
Expand Down Expand Up @@ -63,6 +63,7 @@ def _install():
setattr(cls, "sem", lambda groupby, **kw: agg(groupby, "sem", **kw))
setattr(cls, "nunique", lambda groupby, **kw: agg(groupby, "nunique", **kw))

setattr(cls, "__len__", groupby_len)
setattr(cls, "apply", groupby_apply)
setattr(cls, "transform", groupby_transform)

Expand Down
86 changes: 86 additions & 0 deletions python/xorbits/_mars/dataframe/groupby/len.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import pandas as pd

from ... import opcodes
from ...core import OutputType
from ...core.operand import Operand, OperandStage
from ..operands import DataFrameOperandMixin


class GroupByLen(DataFrameOperandMixin, Operand):
_op_type_ = opcodes.GROUPBY_LEN

def __call__(self, groupby):
return self.new_scalar([groupby])

@classmethod
def tile(cls, op: "GroupByLen"):
in_groupby = op.inputs[0]

# generate map chunks
map_chunks = []
for chunk in in_groupby.chunks:
map_op = op.copy().reset_key()
map_op.stage = OperandStage.map
map_op.output_types = [OutputType.series]
chunk_inputs = [chunk]

map_chunks.append(map_op.new_chunk(chunk_inputs))

# generate reduce chunks, we only need one reducer here.
out_chunks = []
reduce_op = op.copy().reset_key()
reduce_op.output_types = [OutputType.scalar]
reduce_op.stage = OperandStage.reduce

out_chunks.append(
reduce_op.new_chunk(map_chunks, shape=(), index=(0,), dtype=int)
)

# final wrap up:
new_op = op.copy()
params = op.outputs[0].params.copy()

params.pop("shape")

params["chunks"] = out_chunks
return new_op.new_scalars(op.inputs, **params)

@classmethod
def execute_map(cls, ctx, op: "GroupByLen"):
chunk = op.outputs[0]
in_df_grouped = ctx[op.inputs[0].key]

# grouped object .size() method ensure every unique keys
summary = in_df_grouped.size()
sum_indexes = summary.index

res = []
for index in sum_indexes:
res.append(index)

# use series to convey every index store in this level
ctx[chunk.key] = pd.Series(res)

@classmethod
def execute_reduce(cls, ctx, op: "GroupByLen"):
chunk = op.outputs[0]
res = set()
for input in op.inputs:
key = input.key
input_series = ctx[key]
res.update(input_series)

res_len = len(res)
ctx[chunk.key] = res_len

@classmethod
def execute(cls, ctx, op: "GroupByLen"):
if op.stage == OperandStage.map:
cls.execute_map(ctx, op)
elif op.stage == OperandStage.reduce:
cls.execute_reduce(ctx, op)


def groupby_len(groupby):
op = GroupByLen()
return op(groupby)
38 changes: 38 additions & 0 deletions python/xorbits/_mars/dataframe/groupby/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,3 +525,41 @@ def test_groupby_fill():
assert len(r.chunks) == 4
assert r.shape == (len(s1),)
assert r.chunks[0].shape == (np.nan,)


def test_groupby_len_behavior(setup):
df = pd.DataFrame(
[
[2, 11, 10],
[3, 1, 89],
[6, 1, 51],
[6, 2, 10],
[6, 2, 20],
[3, 2, 35],
[7, 3, 102],
[2, 3, 88],
],
columns=["one", "two", "three"],
)
mdf = md.DataFrame(df, chunk_size=3)

r = tile(mdf.groupby(["two"]).__len__())
assert r.op.output_types[0] == OutputType.scalar
assert r.shape == ()
assert len(r.chunks) == 1
assert r.chunks[0].shape == ()

r = tile(mdf.groupby(["one", "two"]).__len__())
assert r.op.output_types[0] == OutputType.scalar
assert r.shape == ()
assert len(r.chunks) == 1
assert r.chunks[0].shape == ()

s1 = pd.Series([4, 3, 9, np.nan, np.nan, 7, 10, 8, 1, 6])
ms1 = md.Series(s1, chunk_size=3)

r = tile(ms1.groupby(lambda x: x % 2).__len__())
assert r.op.output_types[0] == OutputType.scalar
assert r.shape == ()
assert len(r.chunks) == 1
assert r.chunks[0].shape == ()
Original file line number Diff line number Diff line change
Expand Up @@ -1886,3 +1886,39 @@ def test_series_groupby_rolling_agg(setup, window, min_periods, center, closed,
mresult = mresult.execute().fetch()

pd.testing.assert_series_equal(presult, mresult.sort_index())


def test_groupby_len(setup):
np.random.seed(42)
num_dataframes = 10
for i in range(num_dataframes):
# dataframe
data = {
"Category": np.random.choice(["A", "B", "C"], size=100),
"Value": np.random.randint(1, 100, size=100),
}

# DataFrame test
df_test = pd.DataFrame(data)
df_splitted = md.DataFrame(df_test, chunk_size=35)

grouped_test = df_test.groupby(
"Category"
) # this is the original pandas version.
grouped_splitted = df_splitted.groupby("Category")
grouped_test2 = df_test.groupby("Value")
grouped_splitted2 = df_splitted.groupby("Value")

assert grouped_splitted.__len__().execute().fetch() == len(grouped_test)
assert grouped_splitted2.__len__().execute().fetch() == len(grouped_test2)

# Series Groupby test:
data2 = np.random.choice(["A", "B", "C"], size=100)

series = md.Series(data2, chunk_size=35)
series_test = pd.Series(data2)

grouped_s = series.groupby(series)
grouped_s_test = series_test.groupby(series_test)

assert grouped_s.__len__().execute().fetch() == len(grouped_s_test)
2 changes: 2 additions & 0 deletions python/xorbits/_mars/opcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@
APPLYMAP = 742
PIVOT = 743
PIVOT_TABLE = 744
LEN = 745

FUSE = 801

Expand Down Expand Up @@ -434,6 +435,7 @@
GROUPBY_SORT_REGULAR_SAMPLE = 2037
GROUPBY_SORT_PIVOT = 2038
GROUPBY_SORT_SHUFFLE = 2039
GROUPBY_LEN = 2064

# parallel sorting by regular sampling
PSRS_SORT_REGULAR_SMAPLE = 2040
Expand Down