diff --git a/python/ray/data/tests/test_binary.py b/python/ray/data/tests/test_binary.py index 1416ebc6b2b3..f1735da802f7 100644 --- a/python/ray/data/tests/test_binary.py +++ b/python/ray/data/tests/test_binary.py @@ -19,7 +19,7 @@ from ray.data.tests.conftest import * # noqa from ray.data.tests.mock_http_server import * # noqa from ray.data.tests.test_partitioning import PathPartitionEncoder -from ray.data.tests.util import Counter, extract_values, gen_bin_files +from ray.data.tests.util import extract_values, gen_bin_files from ray.tests.conftest import * # noqa @@ -42,7 +42,7 @@ def test_read_binary_files_partitioning(ray_start_regular_shared, tmp_path): def test_read_binary_files(ray_start_regular_shared): with gen_bin_files(10) as (_, paths): - ds = ray.data.read_binary_files(paths, override_num_blocks=10) + ds = ray.data.read_binary_files(paths) for i, item in enumerate(ds.iter_rows()): expected = open(paths[i], "rb").read() assert expected == item["bytes"] @@ -67,14 +67,14 @@ def test_read_binary_files_ignore_missing_paths( with pytest.raises(FileNotFoundError): ds = ray.data.read_binary_files( paths, ignore_missing_paths=ignore_missing_paths - ) + ).materialize() def test_read_binary_files_with_fs(ray_start_regular_shared): with gen_bin_files(10) as (tempdir, paths): # All the paths are absolute, so we want the root file system. fs, _ = pa.fs.FileSystem.from_uri("/") - ds = ray.data.read_binary_files(paths, filesystem=fs, override_num_blocks=10) + ds = ray.data.read_binary_files(paths, filesystem=fs) for i, item in enumerate(ds.iter_rows()): expected = open(paths[i], "rb").read() assert expected == item["bytes"] @@ -142,7 +142,9 @@ def test_read_binary_meta_provider( ) +@pytest.mark.parametrize("style", [PartitionStyle.HIVE, PartitionStyle.DIRECTORY]) def test_read_binary_snappy_partitioned_with_filter( + style, ray_start_regular_shared, tmp_path, write_base_partitioned_df, @@ -156,50 +158,41 @@ def df_to_binary(dataframe, path, **kwargs): snappy.stream_compress(bytes, f) partition_keys = ["one"] - kept_file_counter = Counter.remote() - skipped_file_counter = Counter.remote() def skip_unpartitioned(kv_dict): - keep = bool(kv_dict) - counter = kept_file_counter if keep else skipped_file_counter - ray.get(counter.increment.remote()) - return keep - - for style in [PartitionStyle.HIVE, PartitionStyle.DIRECTORY]: - base_dir = os.path.join(tmp_path, style.value) - partition_path_encoder = PathPartitionEncoder.of( - style=style, - base_dir=base_dir, - field_names=partition_keys, - ) - write_base_partitioned_df( - partition_keys, - partition_path_encoder, - df_to_binary, - ) - df_to_binary(pd.DataFrame({"1": [1]}), os.path.join(base_dir, "test.snappy")) - partition_path_filter = PathPartitionFilter.of( - style=style, - base_dir=base_dir, - field_names=partition_keys, - filter_fn=skip_unpartitioned, - ) - ds = ray.data.read_binary_files( - base_dir, - partition_filter=partition_path_filter, - arrow_open_stream_args=dict(compression="snappy"), - ) - assert_base_partitioned_ds( - ds, - count=2, - schema=Schema(pa.schema([("bytes", pa.binary())])), - sorted_values=[b"1 a\n1 b\n1 c", b"3 e\n3 f\n3 g"], - ds_take_transform_fn=lambda t: extract_values("bytes", t), - ) - assert ray.get(kept_file_counter.get.remote()) == 2 - assert ray.get(skipped_file_counter.get.remote()) == 1 - ray.get(kept_file_counter.reset.remote()) - ray.get(skipped_file_counter.reset.remote()) + return bool(kv_dict) + + base_dir = os.path.join(tmp_path, style.value) + partition_path_encoder = PathPartitionEncoder.of( + style=style, + base_dir=base_dir, + field_names=partition_keys, + ) + write_base_partitioned_df( + partition_keys, + partition_path_encoder, + df_to_binary, + ) + df_to_binary(pd.DataFrame({"1": [1]}), os.path.join(base_dir, "test.snappy")) + partition_path_filter = PathPartitionFilter.of( + style=style, + base_dir=base_dir, + field_names=partition_keys, + filter_fn=skip_unpartitioned, + ) + ds = ray.data.read_binary_files( + base_dir, + partition_filter=partition_path_filter, + arrow_open_stream_args=dict(compression="snappy"), + ) + assert_base_partitioned_ds( + ds, + count=2, + num_rows=2, + schema=Schema(pa.schema([("bytes", pa.binary())])), + sorted_values=[b"1 a\n1 b\n1 c", b"3 e\n3 f\n3 g"], + ds_take_transform_fn=lambda t: extract_values("bytes", t), + ) if __name__ == "__main__": diff --git a/python/ray/data/tests/test_csv.py b/python/ray/data/tests/test_csv.py index f685317d5130..4d28997dcc8e 100644 --- a/python/ray/data/tests/test_csv.py +++ b/python/ray/data/tests/test_csv.py @@ -26,7 +26,6 @@ from ray.data.tests.conftest import * # noqa from ray.data.tests.mock_http_server import * # noqa from ray.data.tests.test_partitioning import PathPartitionEncoder -from ray.data.tests.util import Counter from ray.tests.conftest import * # noqa @@ -77,7 +76,7 @@ def test_csv_read(ray_start_regular_shared, fs, data_path, endpoint_url): path1 = os.path.join(data_path, "test1.csv") df1.to_csv(path1, index=False, storage_options=storage_options) ds = ray.data.read_csv(path1, filesystem=fs, partitioning=None) - dsdf = ds.to_pandas() + dsdf = ds.to_pandas().sort_values(by=["one", "two"]).reset_index(drop=True) assert df1.equals(dsdf) # Test metadata ops. assert ds.count() == 3 @@ -91,7 +90,7 @@ def test_csv_read(ray_start_regular_shared, fs, data_path, endpoint_url): ds = ray.data.read_csv( [path1, path2], override_num_blocks=2, filesystem=fs, partitioning=None ) - dsdf = ds.to_pandas() + dsdf = ds.to_pandas().sort_values(by=["one", "two"]).reset_index(drop=True) df = pd.concat([df1, df2], ignore_index=True) assert df.equals(dsdf) # Test metadata ops. @@ -106,7 +105,7 @@ def test_csv_read(ray_start_regular_shared, fs, data_path, endpoint_url): [path1, path2, path3], override_num_blocks=2, filesystem=fs, partitioning=None ) df = pd.concat([df1, df2, df3], ignore_index=True) - dsdf = ds.to_pandas() + dsdf = ds.to_pandas().sort_values(by=["one", "two"]).reset_index(drop=True) assert df.equals(dsdf) # Directory, two files. @@ -123,7 +122,7 @@ def test_csv_read(ray_start_regular_shared, fs, data_path, endpoint_url): df2.to_csv(path2, index=False, storage_options=storage_options) ds = ray.data.read_csv(path, filesystem=fs, partitioning=None) df = pd.concat([df1, df2], ignore_index=True) - dsdf = ds.to_pandas() + dsdf = ds.to_pandas().sort_values(by=["one", "two"]).reset_index(drop=True) pd.testing.assert_frame_equal(df, dsdf) if fs is None: shutil.rmtree(path) @@ -150,7 +149,7 @@ def test_csv_read(ray_start_regular_shared, fs, data_path, endpoint_url): df3.to_csv(file_path3, index=False, storage_options=storage_options) ds = ray.data.read_csv([path1, path2], filesystem=fs, partitioning=None) df = pd.concat([df1, df2, df3], ignore_index=True) - dsdf = ds.to_pandas() + dsdf = ds.to_pandas().sort_values(by=["one", "two"]).reset_index(drop=True) assert df.equals(dsdf) if fs is None: shutil.rmtree(path1) @@ -173,7 +172,7 @@ def test_csv_read(ray_start_regular_shared, fs, data_path, endpoint_url): df2.to_csv(path2, index=False, storage_options=storage_options) ds = ray.data.read_csv([dir_path, path2], filesystem=fs, partitioning=None) df = pd.concat([df1, df2], ignore_index=True) - dsdf = ds.to_pandas() + dsdf = ds.to_pandas().sort_values(by=["one", "two"]).reset_index(drop=True) assert df.equals(dsdf) if fs is None: shutil.rmtree(dir_path) @@ -206,9 +205,8 @@ def test_csv_read(ray_start_regular_shared, fs, data_path, endpoint_url): file_extensions=["csv"], partitioning=None, ) - assert ds._plan.initial_num_blocks() == 2 df = pd.concat([df1, df2], ignore_index=True) - dsdf = ds.to_pandas() + dsdf = ds.to_pandas().sort_values(by=["one", "two"]).reset_index(drop=True) assert df.equals(dsdf) if fs is None: shutil.rmtree(path) @@ -425,9 +423,9 @@ def test_csv_read_many_files_diff_dirs( path = os.path.join(dir_path, f"test_{j}.csv") paths.append(path) df.to_csv(path, index=False, storage_options=storage_options) - ds = ray.data.read_csv(paths, filesystem=fs) + ds = ray.data.read_csv([dir1, dir2], filesystem=fs) - dsdf = ds.to_pandas() + dsdf = ds.to_pandas().sort_values(by=["one"]).reset_index(drop=True) df = pd.concat(dfs).reset_index(drop=True) pd.testing.assert_frame_equal(df, dsdf) @@ -540,7 +538,9 @@ def test_csv_read_partitioned_styles_explicit( (lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")), ], ) +@pytest.mark.parametrize("style", [PartitionStyle.HIVE, PartitionStyle.DIRECTORY]) def test_csv_read_partitioned_with_filter( + style, ray_start_regular_shared, fs, data_path, @@ -555,46 +555,36 @@ def test_csv_read_partitioned_with_filter( ) partition_keys = ["one"] file_writer_fn = partial(df_to_csv, storage_options=storage_options, index=False) - kept_file_counter = Counter.remote() - skipped_file_counter = Counter.remote() def skip_unpartitioned(kv_dict): - keep = bool(kv_dict) - counter = kept_file_counter if keep else skipped_file_counter - ray.get(counter.increment.remote()) - return keep + return bool(kv_dict) - for style in [PartitionStyle.HIVE, PartitionStyle.DIRECTORY]: - base_dir = os.path.join(data_path, style.value) - partition_path_encoder = PathPartitionEncoder.of( - style=style, - base_dir=base_dir, - field_names=partition_keys, - filesystem=fs, - ) - write_base_partitioned_df( - partition_keys, - partition_path_encoder, - file_writer_fn, - ) - file_writer_fn(pd.DataFrame({"1": [1]}), os.path.join(base_dir, "test.csv")) - partition_path_filter = PathPartitionFilter.of( - style=style, - base_dir=base_dir, - field_names=partition_keys, - filesystem=fs, - filter_fn=skip_unpartitioned, - ) - ds = ray.data.read_csv( - base_dir, - partition_filter=partition_path_filter, - filesystem=fs, - ) - assert_base_partitioned_ds(ds) - assert ray.get(kept_file_counter.get.remote()) == 2 - assert ray.get(skipped_file_counter.get.remote()) == 1 - ray.get(kept_file_counter.reset.remote()) - ray.get(skipped_file_counter.reset.remote()) + base_dir = os.path.join(data_path, style.value) + partition_path_encoder = PathPartitionEncoder.of( + style=style, + base_dir=base_dir, + field_names=partition_keys, + filesystem=fs, + ) + write_base_partitioned_df( + partition_keys, + partition_path_encoder, + file_writer_fn, + ) + file_writer_fn(pd.DataFrame({"1": [1]}), os.path.join(base_dir, "test.csv")) + partition_path_filter = PathPartitionFilter.of( + style=style, + base_dir=base_dir, + field_names=partition_keys, + filesystem=fs, + filter_fn=skip_unpartitioned, + ) + ds = ray.data.read_csv( + base_dir, + partition_filter=partition_path_filter, + filesystem=fs, + ) + assert_base_partitioned_ds(ds) @pytest.mark.parametrize( @@ -605,7 +595,9 @@ def skip_unpartitioned(kv_dict): (lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")), ], ) +@pytest.mark.parametrize("style", [PartitionStyle.HIVE, PartitionStyle.DIRECTORY]) def test_csv_read_partitioned_with_filter_multikey( + style, ray_start_regular_shared, fs, data_path, @@ -620,57 +612,42 @@ def test_csv_read_partitioned_with_filter_multikey( ) partition_keys = ["one", "two"] file_writer_fn = partial(df_to_csv, storage_options=storage_options, index=False) - kept_file_counter = Counter.remote() - skipped_file_counter = Counter.remote() def keep_expected_partitions(kv_dict): keep = bool(kv_dict) and ( (kv_dict["one"] == "1" and kv_dict["two"] in {"a", "b", "c"}) or (kv_dict["one"] == "3" and kv_dict["two"] in {"e", "f", "g"}) ) - counter = kept_file_counter if keep else skipped_file_counter - ray.get(counter.increment.remote()) return keep - for i, style in enumerate([PartitionStyle.HIVE, PartitionStyle.DIRECTORY]): - base_dir = os.path.join(data_path, style.value) - partition_path_encoder = PathPartitionEncoder.of( - style=style, - base_dir=base_dir, - field_names=partition_keys, - filesystem=fs, - ) - write_base_partitioned_df( - partition_keys, - partition_path_encoder, - file_writer_fn, - ) - df = pd.DataFrame({"1": [1]}) - file_writer_fn(df, os.path.join(data_path, f"test{i}.csv")) - partition_path_filter = PathPartitionFilter.of( - style=style, - base_dir=base_dir, - field_names=partition_keys, - filesystem=fs, - filter_fn=keep_expected_partitions, - ) - ds = ray.data.read_csv( - data_path, - partition_filter=partition_path_filter, - filesystem=fs, - override_num_blocks=6, - ) - assert_base_partitioned_ds(ds, num_input_files=6) - assert ray.get(kept_file_counter.get.remote()) == 6 - if i == 0: - # expect to skip 1 unpartitioned files in the parent of the base directory - assert ray.get(skipped_file_counter.get.remote()) == 1 - else: - # expect to skip 2 unpartitioned files in the parent of the base directory - # plus 6 unpartitioned files in the base directory's sibling directories - assert ray.get(skipped_file_counter.get.remote()) == 8 - ray.get(kept_file_counter.reset.remote()) - ray.get(skipped_file_counter.reset.remote()) + base_dir = os.path.join(data_path, style.value) + partition_path_encoder = PathPartitionEncoder.of( + style=style, + base_dir=base_dir, + field_names=partition_keys, + filesystem=fs, + ) + write_base_partitioned_df( + partition_keys, + partition_path_encoder, + file_writer_fn, + ) + df = pd.DataFrame({"1": [1]}) + file_writer_fn(df, os.path.join(data_path, "test0.csv")) + partition_path_filter = PathPartitionFilter.of( + style=style, + base_dir=base_dir, + field_names=partition_keys, + filesystem=fs, + filter_fn=keep_expected_partitions, + ) + ds = ray.data.read_csv( + data_path, + partition_filter=partition_path_filter, + filesystem=fs, + override_num_blocks=6, + ) + assert_base_partitioned_ds(ds, num_input_files=6) @pytest.mark.parametrize( @@ -770,26 +747,16 @@ def test_csv_read_filter_non_csv_file(ray_start_regular_shared, tmp_path): # Single non-CSV file. error_message = "Failed to read CSV file" with pytest.raises(ValueError, match=error_message): - ray.data.read_csv(path3).schema() - - # Single non-CSV file with filter. - error_message = "No input files found to read" - with pytest.raises(ValueError, match=error_message): - ray.data.read_csv(path3, file_extensions=["csv"]).schema() + ray.data.read_csv(path3).materialize() # Single CSV file without extension. ds = ray.data.read_csv(path2) assert ds.to_pandas().equals(df) - # Single CSV file without extension with filter. - error_message = "No input files found to read" - with pytest.raises(ValueError, match=error_message): - ray.data.read_csv(path2, file_extensions=["csv"]).schema() - # Directory of CSV and non-CSV files. error_message = "Failed to read CSV file" with pytest.raises(ValueError, match=error_message): - ray.data.read_csv(tmp_path).schema() + ray.data.read_csv(tmp_path).materialize() # Directory of CSV and non-CSV files with filter. ds = ray.data.read_csv(tmp_path, file_extensions=["csv"]) @@ -846,7 +813,7 @@ def test_csv_read_with_column_type_specified(shutdown_only, tmp_path): Version(pa.__version__) < Version("7.0.0"), reason="invalid_row_handler was added in pyarrow 7.0.0", ) -def test_csv_invalid_file_handler(shutdown_only, tmp_path): +def test_csv_invalid_file_handler(ray_start_regular_shared, tmp_path): from pyarrow import csv invalid_txt = "f1,f2\n2,3\nx\n4,5" diff --git a/python/ray/data/tests/test_numpy.py b/python/ray/data/tests/test_numpy.py index 3b804192a719..2e6142b393d1 100644 --- a/python/ray/data/tests/test_numpy.py +++ b/python/ray/data/tests/test_numpy.py @@ -19,7 +19,7 @@ from ray.data.tests.conftest import * # noqa from ray.data.tests.mock_http_server import * # noqa from ray.data.tests.test_partitioning import PathPartitionEncoder -from ray.data.tests.util import Counter, extract_values +from ray.data.tests.util import extract_values from ray.tests.conftest import * # noqa @@ -196,7 +196,9 @@ def test_numpy_read_meta_provider(ray_start_regular_shared, tmp_path): ) +@pytest.mark.parametrize("style", [PartitionStyle.HIVE, PartitionStyle.DIRECTORY]) def test_numpy_read_partitioned_with_filter( + style, ray_start_regular_shared, tmp_path, write_partitioned_df, @@ -207,50 +209,47 @@ def df_to_np(dataframe, path, **kwargs): df = pd.DataFrame({"one": [1, 1, 1, 3, 3, 3], "two": [0, 1, 2, 3, 4, 5]}) partition_keys = ["one"] - kept_file_counter = Counter.remote() - skipped_file_counter = Counter.remote() def skip_unpartitioned(kv_dict): - keep = bool(kv_dict) - counter = kept_file_counter if keep else skipped_file_counter - ray.get(counter.increment.remote()) - return keep - - for style in [PartitionStyle.HIVE, PartitionStyle.DIRECTORY]: - base_dir = os.path.join(tmp_path, style.value) - partition_path_encoder = PathPartitionEncoder.of( - style=style, - base_dir=base_dir, - field_names=partition_keys, - ) - write_partitioned_df( - df, - partition_keys, - partition_path_encoder, - df_to_np, - ) - df_to_np(df, os.path.join(base_dir, "test.npy")) - partition_path_filter = PathPartitionFilter.of( - style=style, - base_dir=base_dir, - field_names=partition_keys, - filter_fn=skip_unpartitioned, - ) - ds = ray.data.read_numpy(base_dir, partition_filter=partition_path_filter) - - vals = [[1, 0], [1, 1], [1, 2], [3, 3], [3, 4], [3, 5]] - val_str = "".join(f"array({v}, dtype=int8), " for v in vals)[:-2] - assert_base_partitioned_ds( - ds, - schema=Schema(pa.schema([("data", ArrowTensorType((2,), pa.int8()))])), - sorted_values=f"[[{val_str}]]", - ds_take_transform_fn=lambda taken: [extract_values("data", taken)], - sorted_values_transform_fn=lambda sorted_values: str(sorted_values), - ) - assert ray.get(kept_file_counter.get.remote()) == 2 - assert ray.get(skipped_file_counter.get.remote()) == 1 - ray.get(kept_file_counter.reset.remote()) - ray.get(skipped_file_counter.reset.remote()) + return bool(kv_dict) + + base_dir = os.path.join(tmp_path, style.value) + partition_path_encoder = PathPartitionEncoder.of( + style=style, + base_dir=base_dir, + field_names=partition_keys, + ) + write_partitioned_df( + df, + partition_keys, + partition_path_encoder, + df_to_np, + ) + df_to_np(df, os.path.join(base_dir, "test.npy")) + partition_path_filter = PathPartitionFilter.of( + style=style, + base_dir=base_dir, + field_names=partition_keys, + filter_fn=skip_unpartitioned, + ) + ds = ray.data.read_numpy(base_dir, partition_filter=partition_path_filter) + + def sorted_values_transform_fn(sorted_values): + # HACK: `assert_base_partitioned_ds` doesn't properly sort the values. This is a + # hack to make the test pass. + # TODO(@bveeramani): Clean this up. + actually_sorted_values = sorted(sorted_values[0], key=lambda item: tuple(item)) + return str([actually_sorted_values]) + + vals = [[1, 0], [1, 1], [1, 2], [3, 3], [3, 4], [3, 5]] + val_str = "".join(f"array({v}, dtype=int8), " for v in vals)[:-2] + assert_base_partitioned_ds( + ds, + schema=Schema(pa.schema([("data", ArrowTensorType((2,), pa.int8()))])), + sorted_values=f"[[{val_str}]]", + ds_take_transform_fn=lambda taken: [extract_values("data", taken)], + sorted_values_transform_fn=sorted_values_transform_fn, + ) @pytest.mark.parametrize(