Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: map_chunk wrong results when input is empty data #727

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
25 changes: 12 additions & 13 deletions python/xorbits/_mars/dataframe/base/map_chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,18 @@
self.input = self.inputs[0]

def _infer_attrs_by_call(self, df_or_series):
test_obj = (
build_df(df_or_series, size=2)
if df_or_series.ndim == 2
else build_series(df_or_series, size=2, name=df_or_series.name)
)
if len(df_or_series) == 0:
test_obj = (

Check warning on line 74 in python/xorbits/_mars/dataframe/base/map_chunk.py

View check run for this annotation

Codecov / codecov/patch

python/xorbits/_mars/dataframe/base/map_chunk.py#L74

Added line #L74 was not covered by tests
build_empty_df(df_or_series.dtypes)
if df_or_series.ndim == 2
else (build_empty_series(df_or_series.dtype, name=df_or_series.name))
)
else:
test_obj = (

Check warning on line 80 in python/xorbits/_mars/dataframe/base/map_chunk.py

View check run for this annotation

Codecov / codecov/patch

python/xorbits/_mars/dataframe/base/map_chunk.py#L80

Added line #L80 was not covered by tests
build_df(df_or_series, size=2)
if df_or_series.ndim == 2
else build_series(df_or_series, size=2, name=df_or_series.name)
)
kwargs = self.kwargs or dict()
if self.with_chunk_index:
kwargs["chunk_index"] = (0,) * df_or_series.ndim
Expand Down Expand Up @@ -256,14 +263,6 @@
func = cloudpickle.loads(op.func)
inp = ctx[op.input.key]
out = op.outputs[0]
if len(inp) == 0:
if op.output_types[0] == OutputType.dataframe:
ctx[out.key] = build_empty_df(out.dtypes)
elif op.output_types[0] == OutputType.series:
ctx[out.key] = build_empty_series(out.dtype, name=out.name)
else:
raise ValueError(f"Chunk can not be empty except for dataframe/series.")
return

kwargs = op.kwargs or dict()
if op.with_chunk_index:
Expand Down
45 changes: 45 additions & 0 deletions python/xorbits/_mars/dataframe/base/tests/test_base_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -3232,3 +3232,48 @@ def test_copy_deep(setup, chunk_size):
expected_c["a1"] = expected_c["a1"] + 0.8
pd.testing.assert_frame_equal(xdf_c.execute().fetch(), expected_c)
pd.testing.assert_frame_equal(xdf.execute().fetch(), expected)


def test_map_chunk_with_empty_input(setup):
df = pd.DataFrame(columns=["a", "b", "c"])
series = pd.Series(name="hello")
mdf = from_pandas_df(df)
ms = from_pandas_series(series)

# df to df
def p(d):
if not len(d):
return pd.DataFrame([[None] * d.shape[1]], columns=d.columns)
else:
return d

res = mdf.map_chunk(p)
expected = pd.DataFrame([[None] * df.shape[1]], columns=df.columns)
pd.testing.assert_frame_equal(res.execute().fetch(), expected)

# series to series
def x1(d):
if not len(d):
return pd.Series([1], name=d.name)
else:
return d

res = ms.map_chunk(x1)
expected = pd.Series([1], name=series.name)
pd.testing.assert_series_equal(res.execute().fetch(), expected)

# series to df
def x2(d):
return pd.DataFrame({d.name: [np.nan, 1, 2]})

res = ms.map_chunk(x2)
expected = pd.DataFrame({series.name: [np.nan, 1, 2]})
pd.testing.assert_frame_equal(res.execute().fetch(), expected)

# df to series
def x3(d):
return pd.Series(list(d.columns), name=d.columns[1])

res = mdf.map_chunk(x3)
expected = pd.Series(list(df.columns), name=df.columns[1])
pd.testing.assert_series_equal(res.execute().fetch(), expected)
Loading