Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Refactor some file-related unit tests #48228

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 39 additions & 46 deletions python/ray/data/tests/test_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from ray.data.tests.conftest import * # noqa
from ray.data.tests.mock_http_server import * # noqa
from ray.data.tests.test_partitioning import PathPartitionEncoder
from ray.data.tests.util import Counter, extract_values, gen_bin_files
from ray.data.tests.util import extract_values, gen_bin_files
from ray.tests.conftest import * # noqa


Expand All @@ -42,7 +42,7 @@ def test_read_binary_files_partitioning(ray_start_regular_shared, tmp_path):

def test_read_binary_files(ray_start_regular_shared):
with gen_bin_files(10) as (_, paths):
ds = ray.data.read_binary_files(paths, override_num_blocks=10)
ds = ray.data.read_binary_files(paths)
for i, item in enumerate(ds.iter_rows()):
expected = open(paths[i], "rb").read()
assert expected == item["bytes"]
Expand All @@ -67,14 +67,14 @@ def test_read_binary_files_ignore_missing_paths(
with pytest.raises(FileNotFoundError):
ds = ray.data.read_binary_files(
paths, ignore_missing_paths=ignore_missing_paths
)
).materialize()


def test_read_binary_files_with_fs(ray_start_regular_shared):
with gen_bin_files(10) as (tempdir, paths):
# All the paths are absolute, so we want the root file system.
fs, _ = pa.fs.FileSystem.from_uri("/")
ds = ray.data.read_binary_files(paths, filesystem=fs, override_num_blocks=10)
ds = ray.data.read_binary_files(paths, filesystem=fs)
for i, item in enumerate(ds.iter_rows()):
expected = open(paths[i], "rb").read()
assert expected == item["bytes"]
Expand Down Expand Up @@ -142,7 +142,9 @@ def test_read_binary_meta_provider(
)


@pytest.mark.parametrize("style", [PartitionStyle.HIVE, PartitionStyle.DIRECTORY])
def test_read_binary_snappy_partitioned_with_filter(
style,
ray_start_regular_shared,
tmp_path,
write_base_partitioned_df,
Expand All @@ -156,50 +158,41 @@ def df_to_binary(dataframe, path, **kwargs):
snappy.stream_compress(bytes, f)

partition_keys = ["one"]
kept_file_counter = Counter.remote()
skipped_file_counter = Counter.remote()

def skip_unpartitioned(kv_dict):
keep = bool(kv_dict)
counter = kept_file_counter if keep else skipped_file_counter
ray.get(counter.increment.remote())
return keep

for style in [PartitionStyle.HIVE, PartitionStyle.DIRECTORY]:
base_dir = os.path.join(tmp_path, style.value)
partition_path_encoder = PathPartitionEncoder.of(
style=style,
base_dir=base_dir,
field_names=partition_keys,
)
write_base_partitioned_df(
partition_keys,
partition_path_encoder,
df_to_binary,
)
df_to_binary(pd.DataFrame({"1": [1]}), os.path.join(base_dir, "test.snappy"))
partition_path_filter = PathPartitionFilter.of(
style=style,
base_dir=base_dir,
field_names=partition_keys,
filter_fn=skip_unpartitioned,
)
ds = ray.data.read_binary_files(
base_dir,
partition_filter=partition_path_filter,
arrow_open_stream_args=dict(compression="snappy"),
)
assert_base_partitioned_ds(
ds,
count=2,
schema=Schema(pa.schema([("bytes", pa.binary())])),
sorted_values=[b"1 a\n1 b\n1 c", b"3 e\n3 f\n3 g"],
ds_take_transform_fn=lambda t: extract_values("bytes", t),
)
assert ray.get(kept_file_counter.get.remote()) == 2
assert ray.get(skipped_file_counter.get.remote()) == 1
ray.get(kept_file_counter.reset.remote())
ray.get(skipped_file_counter.reset.remote())
return bool(kv_dict)

base_dir = os.path.join(tmp_path, style.value)
partition_path_encoder = PathPartitionEncoder.of(
style=style,
base_dir=base_dir,
field_names=partition_keys,
)
write_base_partitioned_df(
partition_keys,
partition_path_encoder,
df_to_binary,
)
df_to_binary(pd.DataFrame({"1": [1]}), os.path.join(base_dir, "test.snappy"))
partition_path_filter = PathPartitionFilter.of(
style=style,
base_dir=base_dir,
field_names=partition_keys,
filter_fn=skip_unpartitioned,
)
ds = ray.data.read_binary_files(
base_dir,
partition_filter=partition_path_filter,
arrow_open_stream_args=dict(compression="snappy"),
)
assert_base_partitioned_ds(
ds,
count=2,
num_rows=2,
schema=Schema(pa.schema([("bytes", pa.binary())])),
sorted_values=[b"1 a\n1 b\n1 c", b"3 e\n3 f\n3 g"],
ds_take_transform_fn=lambda t: extract_values("bytes", t),
)


if __name__ == "__main__":
Expand Down
Loading
Loading