Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Split test_dataset.py completely into test_dataset_{consumption,map,all_to_all}.py #33101

Merged
merged 6 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4,621 changes: 0 additions & 4,621 deletions python/ray/data/tests/test_dataset.py

This file was deleted.

1,703 changes: 1,703 additions & 0 deletions python/ray/data/tests/test_dataset_all_to_all.py

Large diffs are not rendered by default.

1,679 changes: 1,679 additions & 0 deletions python/ray/data/tests/test_dataset_consumption.py

Large diffs are not rendered by default.

145 changes: 145 additions & 0 deletions python/ray/data/tests/test_dataset_ecosystem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import numpy as np
import pandas as pd
import pyarrow as pa
import pytest

import ray
from ray.data.extensions.tensor_extension import (
ArrowTensorArray,
ArrowTensorType,
TensorArray,
TensorDtype,
)
from ray.data.tests.conftest import * # noqa
from ray.tests.conftest import * # noqa


def test_from_dask(ray_start_regular_shared):
import dask.dataframe as dd

df = pd.DataFrame({"one": list(range(100)), "two": list(range(100))})
ddf = dd.from_pandas(df, npartitions=10)
ds = ray.data.from_dask(ddf)
dfds = ds.to_pandas()
assert df.equals(dfds)


@pytest.mark.parametrize("ds_format", ["pandas", "arrow"])
def test_to_dask(ray_start_regular_shared, ds_format):
from ray.util.dask import ray_dask_get

df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
df = pd.concat([df1, df2])
ds = ray.data.from_pandas([df1, df2])
if ds_format == "arrow":
ds = ds.map_batches(lambda df: df, batch_format="pyarrow", batch_size=None)
ddf = ds.to_dask()
meta = ddf._meta
# Check metadata.
assert isinstance(meta, pd.DataFrame)
assert meta.empty
assert list(meta.columns) == ["one", "two"]
assert list(meta.dtypes) == [np.int64, object]
# Explicit Dask-on-Ray
assert df.equals(ddf.compute(scheduler=ray_dask_get))
# Implicit Dask-on-Ray.
assert df.equals(ddf.compute())

# Explicit metadata.
df1["two"] = df1["two"].astype(pd.StringDtype())
df2["two"] = df2["two"].astype(pd.StringDtype())
df = pd.concat([df1, df2])
ds = ray.data.from_pandas([df1, df2])
if ds_format == "arrow":
ds = ds.map_batches(lambda df: df, batch_format="pyarrow", batch_size=None)
ddf = ds.to_dask(
meta=pd.DataFrame(
{"one": pd.Series(dtype=np.int16), "two": pd.Series(dtype=pd.StringDtype())}
),
)
meta = ddf._meta
# Check metadata.
assert isinstance(meta, pd.DataFrame)
assert meta.empty
assert list(meta.columns) == ["one", "two"]
assert list(meta.dtypes) == [np.int16, pd.StringDtype()]
# Explicit Dask-on-Ray
assert df.equals(ddf.compute(scheduler=ray_dask_get))
# Implicit Dask-on-Ray.
assert df.equals(ddf.compute())


def test_to_dask_tensor_column_cast_pandas(ray_start_regular_shared):
# Check that tensor column casting occurs when converting a Dataset to a Dask
# DataFrame.
data = np.arange(12).reshape((3, 2, 2))
ctx = ray.data.context.DatasetContext.get_current()
original = ctx.enable_tensor_extension_casting
try:
ctx.enable_tensor_extension_casting = True
in_df = pd.DataFrame({"a": TensorArray(data)})
ds = ray.data.from_pandas(in_df)
dtypes = ds.schema().types
assert len(dtypes) == 1
assert isinstance(dtypes[0], TensorDtype)
out_df = ds.to_dask().compute()
assert out_df["a"].dtype.type is np.object_
expected_df = pd.DataFrame({"a": list(data)})
pd.testing.assert_frame_equal(out_df, expected_df)
finally:
ctx.enable_tensor_extension_casting = original


def test_to_dask_tensor_column_cast_arrow(ray_start_regular_shared):
# Check that tensor column casting occurs when converting a Dataset to a Dask
# DataFrame.
data = np.arange(12).reshape((3, 2, 2))
ctx = ray.data.context.DatasetContext.get_current()
original = ctx.enable_tensor_extension_casting
try:
ctx.enable_tensor_extension_casting = True
in_table = pa.table({"a": ArrowTensorArray.from_numpy(data)})
ds = ray.data.from_arrow(in_table)
dtype = ds.schema().field(0).type
assert isinstance(dtype, ArrowTensorType)
out_df = ds.to_dask().compute()
assert out_df["a"].dtype.type is np.object_
expected_df = pd.DataFrame({"a": list(data)})
pd.testing.assert_frame_equal(out_df, expected_df)
finally:
ctx.enable_tensor_extension_casting = original


def test_from_modin(ray_start_regular_shared):
import modin.pandas as mopd

df = pd.DataFrame(
{"one": list(range(100)), "two": list(range(100))},
)
modf = mopd.DataFrame(df)
ds = ray.data.from_modin(modf)
dfds = ds.to_pandas()
assert df.equals(dfds)


def test_to_modin(ray_start_regular_shared):
# create two modin dataframes
# one directly from a pandas dataframe, and
# another from ray.dataset created from the original pandas dataframe
#
import modin.pandas as mopd

df = pd.DataFrame(
{"one": list(range(100)), "two": list(range(100))},
)
modf1 = mopd.DataFrame(df)
ds = ray.data.from_pandas([df])
modf2 = ds.to_modin()
assert modf1.equals(modf2)


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))
Loading