diff --git a/.vale/styles/config/vocabularies/Data/accept.txt b/.vale/styles/config/vocabularies/Data/accept.txt index 8ec78bd70bce3..1104d6f3cd410 100644 --- a/.vale/styles/config/vocabularies/Data/accept.txt +++ b/.vale/styles/config/vocabularies/Data/accept.txt @@ -7,6 +7,7 @@ Data('s)? [Dd]iscretizer(s)? dtype [Gg]roupby +[Hh]udi [Ii]ndexable [Ii]ngest [Ii]nqueue(s)? diff --git a/doc/source/data/api/input_output.rst b/doc/source/data/api/input_output.rst index bb8d791d98b21..51bd7ecedb13a 100644 --- a/doc/source/data/api/input_output.rst +++ b/doc/source/data/api/input_output.rst @@ -186,6 +186,15 @@ Delta Sharing read_delta_sharing_tables +Hudi +---- + +.. autosummary:: + :nosignatures: + :toctree: doc/ + + read_hudi + Iceberg ------- diff --git a/python/ray/data/BUILD b/python/ray/data/BUILD index 9a193e369a26b..37af8905a7c2d 100644 --- a/python/ray/data/BUILD +++ b/python/ray/data/BUILD @@ -225,6 +225,14 @@ py_test( deps = ["//:ray_lib", ":conftest"], ) +py_test( + name = "test_hudi", + size = "small", + srcs = ["tests/test_hudi.py"], + tags = ["team:data", "exclusive"], + deps = ["//:ray_lib", ":conftest"], +) + py_test( name = "test_image", size = "small", diff --git a/python/ray/data/__init__.py b/python/ray/data/__init__.py index 58514478a9ae3..25b53378ee2b3 100644 --- a/python/ray/data/__init__.py +++ b/python/ray/data/__init__.py @@ -49,6 +49,7 @@ read_databricks_tables, read_datasource, read_delta_sharing_tables, + read_hudi, read_iceberg, read_images, read_json, @@ -140,6 +141,7 @@ "read_csv", "read_datasource", "read_delta_sharing_tables", + "read_hudi", "read_iceberg", "read_images", "read_json", diff --git a/python/ray/data/_internal/datasource/hudi_datasource.py b/python/ray/data/_internal/datasource/hudi_datasource.py new file mode 100644 index 0000000000000..b3f23876126d1 --- /dev/null +++ b/python/ray/data/_internal/datasource/hudi_datasource.py @@ -0,0 +1,87 @@ +import logging +from functools import partial +from pathlib import PurePosixPath +from typing import Dict, Iterator, List, Optional +from urllib.parse import urljoin + +from ray.data._internal.util import _check_import +from ray.data.block import BlockMetadata +from ray.data.datasource.datasource import Datasource, ReadTask + +logger = logging.getLogger(__name__) + + +class HudiDatasource(Datasource): + """Hudi datasource, for reading Apache Hudi table.""" + + def __init__( + self, + table_uri: str, + storage_options: Optional[Dict[str, str]] = None, + ): + _check_import(self, module="hudi", package="hudi-python") + + self._table_uri = table_uri + self._storage_options = storage_options + + def get_read_tasks(self, parallelism: int) -> List["ReadTask"]: + import pyarrow + from hudi import HudiFileGroupReader, HudiTable + + def _perform_read( + table_uri: str, + base_file_paths: List[str], + options: Dict[str, str], + ) -> Iterator["pyarrow.Table"]: + for p in base_file_paths: + fg_reader = HudiFileGroupReader(table_uri, options) + batches = fg_reader.read_file_slice_by_base_file_path(p) + yield pyarrow.Table.from_batches(batches) + + hudi_table = HudiTable(self._table_uri, self._storage_options) + + reader_options = {} + reader_options.update(hudi_table.storage_options()) + reader_options.update(hudi_table.hudi_options()) + + schema = hudi_table.get_schema() + read_tasks = [] + for file_slices in hudi_table.split_file_slices(parallelism): + if len(file_slices) <= 0: + continue + + num_rows = 0 + relative_paths = [] + input_files = [] + size_bytes = 0 + for f in file_slices: + num_rows += f.num_records + relative_path = f.base_file_relative_path() + relative_paths.append(relative_path) + full_path = urljoin( + self._table_uri, PurePosixPath(relative_path).as_posix() + ) + input_files.append(full_path) + size_bytes += f.base_file_size + + metadata = BlockMetadata( + num_rows=num_rows, + schema=schema, + input_files=input_files, + size_bytes=size_bytes, + exec_stats=None, + ) + + read_task = ReadTask( + read_fn=partial( + _perform_read, self._table_uri, relative_paths, reader_options + ), + metadata=metadata, + ) + read_tasks.append(read_task) + + return read_tasks + + def estimate_inmemory_data_size(self) -> Optional[int]: + # TODO(xushiyan) add APIs to provide estimated in-memory size + return None diff --git a/python/ray/data/examples/data/hudi-tables/0.x_cow_partitioned.zip b/python/ray/data/examples/data/hudi-tables/0.x_cow_partitioned.zip new file mode 100644 index 0000000000000..9f78c06de9452 Binary files /dev/null and b/python/ray/data/examples/data/hudi-tables/0.x_cow_partitioned.zip differ diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index b86920f280690..320bfbcf7f5e4 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -27,6 +27,7 @@ from ray.data._internal.datasource.delta_sharing_datasource import ( DeltaSharingDatasource, ) +from ray.data._internal.datasource.hudi_datasource import HudiDatasource from ray.data._internal.datasource.iceberg_datasource import IcebergDatasource from ray.data._internal.datasource.image_datasource import ( ImageDatasource, @@ -2278,6 +2279,57 @@ def get_dbutils(): ) +@PublicAPI(stability="alpha") +def read_hudi( + table_uri: str, + *, + storage_options: Optional[Dict[str, str]] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, +) -> Dataset: + """ + Create a :class:`~ray.data.Dataset` from an + `Apache Hudi table `_. + + Examples: + >>> import ray + >>> ds = ray.data.read_hudi( # doctest: +SKIP + ... table_uri="/hudi/trips", + ... ) + + Args: + table_uri: The URI of the Hudi table to read from. Local file paths, S3, and GCS + are supported. + storage_options: Extra options that make sense for a particular storage + connection. This is used to store connection parameters like credentials, + endpoint, etc. + ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. + + Returns: + A :class:`~ray.data.Dataset` producing records read from the Hudi table. + """ # noqa: E501 + datasource = HudiDatasource( + table_uri=table_uri, + storage_options=storage_options, + ) + + return read_datasource( + datasource=datasource, + ray_remote_args=ray_remote_args, + concurrency=concurrency, + override_num_blocks=override_num_blocks, + ) + + @PublicAPI def from_dask(df: "dask.dataframe.DataFrame") -> MaterializedDataset: """Create a :class:`~ray.data.Dataset` from a diff --git a/python/ray/data/tests/test_hudi.py b/python/ray/data/tests/test_hudi.py new file mode 100644 index 0000000000000..ba0271f12a24d --- /dev/null +++ b/python/ray/data/tests/test_hudi.py @@ -0,0 +1,108 @@ +import os +import zipfile + +import pyarrow as pa +import pytest +from pytest_lazyfixture import lazy_fixture + +import ray +from ray.data.datasource.path_util import ( + _resolve_paths_and_filesystem, + _unwrap_protocol, +) +from ray.data.tests.conftest import * # noqa +from ray.data.tests.mock_http_server import * # noqa +from ray.tests.conftest import * # noqa + +PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < ( + 8, + 0, + 0, +) +pytestmark = pytest.mark.skipif( + PYARROW_LE_8_0_0, reason="hudi only supported if pyarrow >= 8.0.0" +) + + +def _extract_testing_table(fixture_path: str, table_dir: str, target_dir: str) -> str: + with zipfile.ZipFile(fixture_path, "r") as zip_ref: + zip_ref.extractall(target_dir) + return os.path.join(target_dir, table_dir) + + +@pytest.mark.parametrize( + "fs,data_path", + [ + (None, lazy_fixture("local_path")), + (lazy_fixture("local_fs"), lazy_fixture("local_path")), + ], +) +def test_read_hudi_simple_cow_table(ray_start_regular_shared, fs, data_path): + setup_data_path = _unwrap_protocol(data_path) + target_testing_dir = os.path.join(setup_data_path, "test_hudi") + fixture_path, _ = _resolve_paths_and_filesystem( + "example://hudi-tables/0.x_cow_partitioned.zip", fs + ) + target_table_path = _extract_testing_table( + fixture_path[0], "trips_table", target_testing_dir + ) + + ds = ray.data.read_hudi(target_table_path) + + assert ds.schema().names == [ + "_hoodie_commit_time", + "_hoodie_commit_seqno", + "_hoodie_record_key", + "_hoodie_partition_path", + "_hoodie_file_name", + "ts", + "uuid", + "rider", + "driver", + "fare", + "city", + ] + assert ds.count() == 5 + rows = ( + ds.select_columns(["_hoodie_commit_time", "ts", "uuid", "fare"]) + .sort("ts") + .take_all() + ) + assert rows == [ + [ + "20240402123035233", + 1695046462179, + "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", + 33.9, + ], + [ + "20240402123035233", + 1695091554788, + "e96c4396-3fad-413a-a942-4cb36106d721", + 27.7, + ], + [ + "20240402123035233", + 1695115999911, + "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", + 17.85, + ], + [ + "20240402123035233", + 1695159649087, + "334e26e9-8355-45cc-97c6-c31daf0df330", + 19.1, + ], + [ + "20240402123035233", + 1695516137016, + "e3cf430c-889d-4015-bc98-59bdce1e530c", + 34.15, + ], + ] + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/requirements/ml/data-test-requirements.txt b/python/requirements/ml/data-test-requirements.txt index d2d435b09d880..6723409600a4a 100644 --- a/python/requirements/ml/data-test-requirements.txt +++ b/python/requirements/ml/data-test-requirements.txt @@ -18,4 +18,5 @@ delta-sharing pytest-mock decord snowflake-connector-python -pyiceberg[sql-sqlite]==0.7.0 \ No newline at end of file +pyiceberg[sql-sqlite]==0.7.0 +hudi @ git+https://github.com/xushiyan/hudi-rs.git@c76553b9ba0fd7c0c7860621de97939af95587fc#egg=hudi&subdirectory=python diff --git a/python/requirements_compiled.txt b/python/requirements_compiled.txt index a1043afc5b51b..4760255e717ee 100644 --- a/python/requirements_compiled.txt +++ b/python/requirements_compiled.txt @@ -754,6 +754,8 @@ httpx==0.24.1 # -r /ray/ci/../python/requirements/test-requirements.txt # gradio # gradio-client +hudi @ git+https://github.com/xushiyan/hudi-rs.git@c76553b9ba0fd7c0c7860621de97939af95587fc#egg=hudi&subdirectory=python + # via -r /ray/ci/../python/requirements/ml/data-test-requirements.txt huggingface-hub==0.19.4 # via # accelerate