From ce105d5ae642eabb4506010a1730f5aba3b306f9 Mon Sep 17 00:00:00 2001 From: YibLiu <68105073+ZJU3190105746@users.noreply.github.com> Date: Wed, 5 Jul 2023 14:06:44 +0800 Subject: [PATCH] FEAT: Support nlargest and nsmallest op (#530) Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- .../xorbits/_mars/dataframe/base/__init__.py | 6 + .../xorbits/_mars/dataframe/base/nlargest.py | 325 ++++++++++++++++++ .../xorbits/_mars/dataframe/base/nsmallest.py | 125 +++++++ .../base/tests/test_base_execution.py | 184 ++++++++++ python/xorbits/_mars/opcodes.py | 1 + .../tests/test_pandas_adapters.py | 4 - 6 files changed, 641 insertions(+), 4 deletions(-) create mode 100644 python/xorbits/_mars/dataframe/base/nlargest.py create mode 100644 python/xorbits/_mars/dataframe/base/nsmallest.py diff --git a/python/xorbits/_mars/dataframe/base/__init__.py b/python/xorbits/_mars/dataframe/base/__init__.py index c0c024581..b47f6ef75 100644 --- a/python/xorbits/_mars/dataframe/base/__init__.py +++ b/python/xorbits/_mars/dataframe/base/__init__.py @@ -39,6 +39,8 @@ from .map_chunk import map_chunk from .melt import melt from .memory_usage import df_memory_usage, index_memory_usage, series_memory_usage +from .nlargest import dataframe_nlargest, series_nlargest +from .nsmallest import dataframe_nsmallest, series_nsmallest from .pct_change import pct_change from .pivot import df_pivot from .pivot_table import df_pivot_table @@ -96,6 +98,8 @@ def _install(): setattr(t, "query", df_query) setattr(t, "pct_change", pct_change) setattr(t, "transpose", transpose) + setattr(t, "nlargest", dataframe_nlargest) + setattr(t, "nsmallest", dataframe_nsmallest) for t in SERIES_TYPE: setattr(t, "to_gpu", to_gpu) @@ -124,6 +128,8 @@ def _install(): setattr(t, "is_monotonic_increasing", property(fget=is_monotonic_increasing)) setattr(t, "is_monotonic_decreasing", property(fget=is_monotonic_decreasing)) setattr(t, "pct_change", pct_change) + setattr(t, "nlargest", series_nlargest) + setattr(t, "nsmallest", series_nsmallest) for t in INDEX_TYPE: setattr(t, "map", index_map) diff --git a/python/xorbits/_mars/dataframe/base/nlargest.py b/python/xorbits/_mars/dataframe/base/nlargest.py new file mode 100644 index 000000000..f29eb97d6 --- /dev/null +++ b/python/xorbits/_mars/dataframe/base/nlargest.py @@ -0,0 +1,325 @@ +import numpy as np +import pandas as pd + +from ... import opcodes +from ...core import OutputType +from ...serialization.serializables import Int64Field, ListField, StringField +from ..core import IndexValue +from ..merge.concat import DataFrameConcat +from ..operands import DataFrameOperand, DataFrameOperandMixin +from ..utils import build_concatenated_rows_frame, parse_index + + +class DataFrameNLargest(DataFrameOperand, DataFrameOperandMixin): + _op_type_ = opcodes.NLARGEST + + largestOrSmallest = StringField("largestOrSmallest", default=None) + n = Int64Field("n", default=0) + columns = ListField("columns", default=None) + keep = StringField("keep", default="first") + + def __init__(self, output_types=None, **kw): + super().__init__(_output_types=output_types, **kw) + + @classmethod + def _tile_dataframe(cls, op: "DataFrameNLargest"): + df = op.inputs[0] + df = build_concatenated_rows_frame(df) + input_chunks = df.chunks + + if op.n >= input_chunks[0].shape[0]: + out_chunks = input_chunks + else: + out_chunks = [] + for chunk in input_chunks: + chunk_op = op.copy().reset_key() + chunk_params = chunk.params + if op.keep == "all": + chunk_params["shape"] = (np.nan, input_chunks[0].shape[1]) + else: + n = max(chunk.shape[0], op.n) + chunk_params["shape"] = (n, input_chunks[0].shape[1]) + chunk_params["index_value"] = parse_index( + df.chunks[0].index_value.to_pandas()[:0] + ) + out_chunks.append(chunk_op.new_chunk([chunk], kws=[chunk_params])) + op_concat = DataFrameConcat( + axis=0, + output_types=[OutputType.dataframe], + ) + if op.keep == "all": + shape_concat = (np.nan, input_chunks[0].shape[1]) + else: + shape_concat = ( + sum(c.shape[0] for c in out_chunks), + input_chunks[0].shape[1], + ) + chunk_concat = op_concat.new_chunk( + out_chunks, + shape=shape_concat, + index_value=parse_index(df.chunks[0].index_value.to_pandas()[:0]), + ) + final_op = op.copy().reset_key() + chunk_params = input_chunks[0].params + chunk_params["index_value"] = parse_index( + df.chunks[0].index_value.to_pandas()[:0] + ) + if op.keep == "all": + chunk_params["shape"] = (np.nan, input_chunks[0].shape[1]) + else: + chunk_params["shape"] = (op.n, input_chunks[0].shape[1]) + c = final_op.new_chunk([chunk_concat], kws=[chunk_params]) + new_op = op.copy() + kws = op.outputs[0].params.copy() + if op.keep == "all": + kws["nsplits"] = ((np.nan,), (op.outputs[0].shape[1],)) + else: + kws["nsplits"] = ((op.n,), (op.outputs[0].shape[1],)) + kws["chunks"] = [c] + return new_op.new_dataframes(op.inputs, **kws) + + @classmethod + def _tile_series(cls, op: "DataFrameNLargest"): + inp = op.inputs[0] + if op.n >= inp.chunks[0].shape[0]: + out_chunks = inp.chunks + else: + out_chunks = [] + for chunk in inp.chunks: + chunk_op = op.copy().reset_key() + chunk_params = chunk.params + if op.keep == "all": + chunk_params["shape"] = (np.nan,) + else: + n = max(chunk.shape[0], op.n) + chunk_params["shape"] = (n,) + chunk_params["index_value"] = parse_index( + chunk.index_value.to_pandas()[:0] + ) + out_chunks.append(chunk_op.new_chunk([chunk], kws=[chunk_params])) + + op_concat = DataFrameConcat( + axis=0, + output_types=[OutputType.dataframe], + ) + if op.keep == "all": + shape_concat = (np.nan,) + else: + shape_concat = (sum(c.shape[0] for c in out_chunks),) + chunk_concat = op_concat.new_chunk( + out_chunks, + shape=shape_concat, + index_value=parse_index(inp.chunks[0].index_value.to_pandas()[:0]), + ) + final_op = op.copy().reset_key() + chunk_params = inp.chunks[0].params + chunk_params["index_value"] = parse_index( + inp.chunks[0].index_value.to_pandas()[:0] + ) + if op.keep == "all": + chunk_params["shape"] = (np.nan,) + else: + chunk_params["shape"] = (op.n,) + c = final_op.new_chunk([chunk_concat], kws=[chunk_params]) + new_op = op.copy() + params = op.outputs[0].params + params["chunks"] = [c] + if op.keep == "all": + params["nsplits"] = ((np.nan,),) + else: + params["nsplits"] = ((op.n,),) + return new_op.new_seriess(op.inputs, kws=[params]) + + @classmethod + def tile(cls, op: "DataFrameNLargest"): + inp = op.inputs[0] + if inp.ndim == 2: + return cls._tile_dataframe(op) + else: + return cls._tile_series(op) + + @classmethod + def execute(cls, ctx, op: "DataFrameNLargest"): + in_data = ctx[op.inputs[0].key] + if in_data.ndim == 2: + if op.largestOrSmallest == "largest": + result = in_data.nlargest(n=op.n, columns=op.columns, keep=op.keep) + elif op.largestOrSmallest == "smallest": + result = in_data.nsmallest(n=op.n, columns=op.columns, keep=op.keep) + else: + if op.largestOrSmallest == "largest": + result = in_data.nlargest(n=op.n, keep=op.keep) + elif op.largestOrSmallest == "smallest": + result = in_data.nsmallest(n=op.n, keep=op.keep) + ctx[op.outputs[0].key] = result + + def __call__(self, a): + if self.n > a.shape[0]: + self.n = a.shape[0] + if a.ndim == 2: + if self.keep == "all": + return self.new_dataframe( + [a], + shape=(np.nan, a.shape[1]), + dtypes=a.dtypes, + columns_value=a.columns_value, + ) + else: + return self.new_dataframe( + [a], + shape=(self.n, a.shape[1]), + dtypes=a.dtypes, + columns_value=a.columns_value, + ) + else: + if isinstance(a.index_value.value, IndexValue.RangeIndex): + index_value = parse_index(pd.Index([], dtype=np.int64)) + else: + index_value = a.index_value + if self.keep == "all": + return self.new_series( + [a], + shape=(np.nan,), + dtype=a.dtype, + index_value=index_value, + name=a.name, + ) + else: + return self.new_series( + [a], + shape=(self.n,), + dtype=a.dtype, + index_value=index_value, + name=a.name, + ) + + +def dataframe_nlargest(df, n, columns, keep="first"): + """ + Return the first n rows ordered by columns in descending order. + + Parameters + ---------- + df : Mars DataFrame + Input dataframe. + n : int + Number of rows to return. + columns : label or list of labels + Column label(s) to order by. + keep{‘first’, ‘last’, ‘all’}, default ‘first’ + Where there are duplicate values: + first : prioritize the first occurrence(s) + last : prioritize the last occurrence(s) + all : do not drop any duplicates, even it means selecting more than n items. + + Returns + ------- + sorted_obj : Mars DataFrame + The first n rows ordered by the given columns in descending order. + + Examples + -------- + >>> import mars.dataframe as md + >>> df = md.DataFrame({ + ... 'col1': ['A', 'A', 'B', 'E', 'D', 'C'], + ... 'col2': [2, 1, 9, 8, 7, 4], + ... 'col3': [0, 1, 9, 4, 2, 3], + ... }) + >>> df.execute() + col1 col2 col3 + 0 A 2 0 + 1 A 1 1 + 2 B 9 9 + 3 E 8 2 + 4 D 8 4 + 5 C 4 3 + + Choose the first 3 rows ordered by col2 + + >>> df.nlargest(3,"col2").execute() + col1 col2 col3 + 2 B 9 9 + 3 E 8 2 + 4 D 8 4 + + Choose the first 3 rows ordered by multiple columns + + >>> df.nlargest(3,['col2', 'col3']).execute() + col1 col2 col3 + 2 B 9 9 + 4 E 8 4 + 3 D 8 2 + """ + if keep not in ["last", "first", "all"]: + raise ValueError(f'''keep must be either "first", "last" or "all"''') + if isinstance(columns, str): + columns = [columns] + elif isinstance(columns, list): + columns = columns + else: + raise KeyError(columns) + + op = DataFrameNLargest( + largestOrSmallest="largest", + n=n, + columns=columns, + keep=keep, + output_types=[OutputType.dataframe], + gpu=df.op.is_gpu(), + ) + nlargest_df = op(df) + return nlargest_df + + +def series_nlargest(series, n, keep="first"): + """ + Return the largest n elements. + + Parameters + ---------- + df : Mars Series + Input Series. + n : int + Number of rows to return. + keep{‘first’, ‘last’, ‘all’}, default ‘first’ + Where there are duplicate values: + first : prioritize the first occurrence(s) + last : prioritize the last occurrence(s) + all : do not drop any duplicates, even it means selecting more than n items. + + Returns + ------- + sorted_obj : Mars Series + The n largest values in the Series, sorted in decreasing order. + + Examples + -------- + >>> import mars.dataframe as md + >>> raw = pd.Series([8, 1, 3, 10, 5]) + >>> df.execute() + 0 8.0 + 1 1.0 + 2 3.0 + 3 10.0 + 4 5.0 + dtype: float64 + + Choose the largest 3 rows + + >>> df.nlargest(3).execute() + 3 10.0 + 0 8.0 + 4 5.0 + dtype: float64 + """ + if keep not in ["last", "first", "all"]: + raise ValueError(f'''keep must be either "first", "last" or "all"''') + op = DataFrameNLargest( + largestOrSmallest="largest", + n=n, + keep=keep, + output_types=[OutputType.series], + gpu=series.op.is_gpu(), + ) + nlargest_df = op(series) + return nlargest_df diff --git a/python/xorbits/_mars/dataframe/base/nsmallest.py b/python/xorbits/_mars/dataframe/base/nsmallest.py new file mode 100644 index 000000000..04ac7fa77 --- /dev/null +++ b/python/xorbits/_mars/dataframe/base/nsmallest.py @@ -0,0 +1,125 @@ +from ...core import OutputType +from .nlargest import DataFrameNLargest + + +def dataframe_nsmallest(df, n, columns, keep="first"): + """ + Return the first n rows ordered by columns in ascending order. + + Parameters + ---------- + df : Mars DataFrame + Input dataframe. + n : int + Number of rows to return. + columns : label or list of labels + Column label(s) to order by. + keep{‘first’, ‘last’, ‘all’}, default ‘first’ + Where there are duplicate values: + first : prioritize the first occurrence(s) + last : prioritize the last occurrence(s) + all : do not drop any duplicates, even it means selecting more than n items. + + Returns + ------- + sorted_obj : Mars DataFrame + The first n rows ordered by the given columns in ascending order. + + Examples + -------- + >>> import mars.dataframe as md + >>> df = md.DataFrame({ + ... 'col1': ['A', 'A', 'B', 'E', 'D', 'C'], + ... 'col2': [2, 1, 9, 8, 7, 4], + ... 'col3': [0, 1, 9, 4, 2, 3], + ... }) + >>> df.execute() + col1 col2 col3 + 0 A 2 0 + 1 A 1 1 + 2 B 9 9 + 3 E 8 2 + 4 D 8 4 + 5 C 4 3 + + Choose the first 3 rows ordered by col2 + + >>> df.nsmallest(3,"col2").execute() + col1 col2 col3 + 1 A 1 1 + 0 A 2 0 + 5 C 4 3 + """ + if keep not in ["last", "first", "all"]: + raise ValueError(f'''keep must be either "first", "last" or "all"''') + if isinstance(columns, str): + columns = [columns] + elif isinstance(columns, list): + columns = columns + else: + raise KeyError(columns) + + op = DataFrameNLargest( + largestOrSmallest="smallest", + n=n, + columns=columns, + keep=keep, + output_types=[OutputType.dataframe], + gpu=df.op.is_gpu(), + ) + nlargest_df = op(df) + return nlargest_df + + +def series_nsmallest(series, n, keep="first"): + """ + Return the smallest n elements. + + Parameters + ---------- + df : Mars Series + Input Series. + n : int + Number of rows to return. + keep{‘first’, ‘last’, ‘all’}, default ‘first’ + Where there are duplicate values: + first : prioritize the first occurrence(s) + last : prioritize the last occurrence(s) + all : do not drop any duplicates, even it means selecting more than n items. + + Returns + ------- + sorted_obj : Mars Series + The n smallest values in the Series, sorted in ascreasing order. + + Examples + -------- + >>> import mars.dataframe as md + >>> raw = pd.Series([8, 1, 3, 10, 5]) + >>> df.execute() + 0 8.0 + 1 1.0 + 2 3.0 + 3 10.0 + 4 5.0 + dtype: float64 + + Choose the smallest 3 rows + + >>> df.nsmallest(3).execute() + 1 1.0 + 2 3.0 + 4 5.0 + dtype: float64 + """ + if keep not in ["last", "first", "all"]: + raise ValueError(f'''keep must be either "first", "last" or "all"''') + op = DataFrameNLargest( + largestOrSmallest="smallest", + n=n, + keep=keep, + output_types=[OutputType.series], + gpu=series.op.is_gpu(), + ) + nlargest_df = op(series) + return nlargest_df diff --git a/python/xorbits/_mars/dataframe/base/tests/test_base_execution.py b/python/xorbits/_mars/dataframe/base/tests/test_base_execution.py index bcdcd546d..4bdc7f449 100644 --- a/python/xorbits/_mars/dataframe/base/tests/test_base_execution.py +++ b/python/xorbits/_mars/dataframe/base/tests/test_base_execution.py @@ -2804,6 +2804,190 @@ def test_bloom_filter(setup): ) +def test_nsmallest_execution(setup): + ns = np.random.RandomState(0) + df = pd.DataFrame(ns.rand(100, 10), columns=["a" + str(i) for i in range(10)]) + + # test one chunk + mdf = from_pandas_df(df) + result = mdf.nsmallest(3, "a0").execute().fetch() + expected = df.nsmallest(3, "a0") + pd.testing.assert_frame_equal(result, expected) + + # test multi chunk + mdf = from_pandas_df(df, chunk_size=10) + result = mdf.nsmallest(3, "a0").execute().fetch() + pd.testing.assert_frame_equal(result, expected) + + mdf = from_pandas_df(df, chunk_size=3) + result = mdf.nsmallest(3, "a0").execute().fetch() + pd.testing.assert_frame_equal(result, expected) + + # test list columns + mdf = from_pandas_df(df, chunk_size=3) + result = mdf.nsmallest(5, ["a0", "a1"]).execute().fetch() + expected = df.nsmallest(5, ["a0", "a1"]) + pd.testing.assert_frame_equal(result, expected) + + # test k>df.shape[0] + mdf = from_pandas_df(df, chunk_size=3) + result = mdf.nsmallest(102, ["a0", "a1"]).execute().fetch() + expected = df.nsmallest(102, ["a0", "a1"]) + pd.testing.assert_frame_equal(result, expected) + + # test keep=last + mdf = from_pandas_df(df, chunk_size=3) + result = mdf.nsmallest(5, ["a0", "a1"], "last").execute().fetch() + expected = df.nsmallest(5, ["a0", "a1"], "last") + pd.testing.assert_frame_equal(result, expected) + + # test keep=all + df.iloc[77, 0] = df.iloc[80, 0] + df.iloc[77, 1] = df.iloc[80, 1] + mdf = from_pandas_df(df, chunk_size=3) + result = mdf.nsmallest(5, ["a0", "a1"], "all").execute().fetch() + expected = df.nsmallest(5, ["a0", "a1"], "all") + pd.testing.assert_frame_equal(result, expected) + + # test invalid column + with pytest.raises(KeyError): + mdf.nsmallest(5, "a100", "all").execute().fetch() + + # test invalid keep + with pytest.raises(ValueError): + mdf.nsmallest(5, "a1", "abcd").execute().fetch() + + # test series + raw = pd.Series(ns.rand(100)) + + # test one chunk + series = from_pandas_series(raw) + result = series.nsmallest(3).execute().fetch() + expected = raw.nsmallest(3) + pd.testing.assert_series_equal(result, expected) + + # test multi chunk + series = from_pandas_series(raw, chunk_size=10) + result = series.nsmallest(3).execute().fetch() + expected = raw.nsmallest(3) + pd.testing.assert_series_equal(result, expected) + + # test keep=last + series = from_pandas_series(raw, chunk_size=10) + result = series.nsmallest(3, "last").execute().fetch() + expected = raw.nsmallest(3, "last") + pd.testing.assert_series_equal(result, expected) + + # test keep=all + raw[99] = raw[1] + series = from_pandas_series(raw, chunk_size=10) + result = series.nsmallest(3, "all").execute().fetch() + expected = raw.nsmallest(3, "all") + pd.testing.assert_series_equal(result, expected) + + # test k>df.shape[0] + series = from_pandas_series(raw, chunk_size=10) + result = series.nsmallest(110, "first").execute().fetch() + expected = raw.nsmallest(110, "first") + pd.testing.assert_series_equal(result, expected) + + # test invalid keep + with pytest.raises(ValueError): + series.nsmallest(5, "abcd").execute().fetch() + + +def test_nlargest_execution(setup): + ns = np.random.RandomState(0) + df = pd.DataFrame(ns.rand(100, 10), columns=["a" + str(i) for i in range(10)]) + + # test one chunk + mdf = from_pandas_df(df) + result = mdf.nlargest(3, "a0").execute().fetch() + expected = df.nlargest(3, "a0") + pd.testing.assert_frame_equal(result, expected) + + # test multi chunk + mdf = from_pandas_df(df, chunk_size=10) + result = mdf.nlargest(3, "a0").execute().fetch() + pd.testing.assert_frame_equal(result, expected) + + mdf = from_pandas_df(df, chunk_size=3) + result = mdf.nlargest(3, "a0").execute().fetch() + pd.testing.assert_frame_equal(result, expected) + + # test list columns + mdf = from_pandas_df(df, chunk_size=3) + result = mdf.nlargest(5, ["a0", "a1"]).execute().fetch() + expected = df.nlargest(5, ["a0", "a1"]) + pd.testing.assert_frame_equal(result, expected) + + # test k>df.shape[0] + mdf = from_pandas_df(df, chunk_size=3) + result = mdf.nlargest(102, ["a0", "a1"]).execute().fetch() + expected = df.nlargest(102, ["a0", "a1"]) + pd.testing.assert_frame_equal(result, expected) + + # test keep=last + mdf = from_pandas_df(df, chunk_size=3) + result = mdf.nlargest(5, ["a0", "a1"], "last").execute().fetch() + expected = df.nlargest(5, ["a0", "a1"], "last") + pd.testing.assert_frame_equal(result, expected) + + # test keep=all + df.iloc[99, 0] = df.iloc[42, 0] + df.iloc[99, 1] = df.iloc[42, 1] + mdf = from_pandas_df(df, chunk_size=3) + result = mdf.nlargest(5, ["a0", "a1"], "all").execute().fetch() + expected = df.nlargest(5, ["a0", "a1"], "all") + pd.testing.assert_frame_equal(result, expected) + + # test invalid column + with pytest.raises(KeyError): + mdf.nlargest(5, "a100", "all").execute().fetch() + + # test invalid keep + with pytest.raises(ValueError): + mdf.nlargest(5, "a1", "abcd").execute().fetch() + + # test series + raw = pd.Series(ns.rand(100)) + + # test one chunk + series = from_pandas_series(raw) + result = series.nlargest(3).execute().fetch() + expected = raw.nlargest(3) + pd.testing.assert_series_equal(result, expected) + + # test multi chunk + series = from_pandas_series(raw, chunk_size=10) + result = series.nlargest(3).execute().fetch() + expected = raw.nlargest(3) + pd.testing.assert_series_equal(result, expected) + + # test keep=last + series = from_pandas_series(raw, chunk_size=10) + result = series.nlargest(3, "last").execute().fetch() + expected = raw.nlargest(3, "last") + pd.testing.assert_series_equal(result, expected) + + # test keep=all + raw[99] = raw[49] + series = from_pandas_series(raw, chunk_size=10) + result = series.nlargest(3, "all").execute().fetch() + expected = raw.nlargest(3, "all") + pd.testing.assert_series_equal(result, expected) + + # test k>df.shape[0] + series = from_pandas_series(raw, chunk_size=10) + result = series.nlargest(110, "last").execute().fetch() + expected = raw.nlargest(110, "last") + pd.testing.assert_series_equal(result, expected) + + # test invalid keep + with pytest.raises(ValueError): + series.nlargest(5, "abcd").execute().fetch() + + def test_index_str_method(setup): def generate_random_string(length): characters = ( diff --git a/python/xorbits/_mars/opcodes.py b/python/xorbits/_mars/opcodes.py index dc44f1f7f..38964628d 100644 --- a/python/xorbits/_mars/opcodes.py +++ b/python/xorbits/_mars/opcodes.py @@ -390,6 +390,7 @@ APPLYMAP = 742 PIVOT = 743 PIVOT_TABLE = 744 +NLARGEST = 745 FUSE = 801 diff --git a/python/xorbits/pandas/pandas_adapters/tests/test_pandas_adapters.py b/python/xorbits/pandas/pandas_adapters/tests/test_pandas_adapters.py index e451e5206..561fc7cf5 100644 --- a/python/xorbits/pandas/pandas_adapters/tests/test_pandas_adapters.py +++ b/python/xorbits/pandas/pandas_adapters/tests/test_pandas_adapters.py @@ -62,8 +62,6 @@ def test_pandas_dataframe_methods(setup): mad median mode - nlargest - nsmallest pipe pivot_table rank @@ -205,8 +203,6 @@ def test_pandas_series_methods(setup): last_valid_index mad mode - nlargest - nsmallest pipe pop rank