diff --git a/.vale/styles/config/vocabularies/Data/accept.txt b/.vale/styles/config/vocabularies/Data/accept.txt index 8ec78bd70bce..1104d6f3cd41 100644 --- a/.vale/styles/config/vocabularies/Data/accept.txt +++ b/.vale/styles/config/vocabularies/Data/accept.txt @@ -7,6 +7,7 @@ Data('s)? [Dd]iscretizer(s)? dtype [Gg]roupby +[Hh]udi [Ii]ndexable [Ii]ngest [Ii]nqueue(s)? diff --git a/doc/source/data/api/input_output.rst b/doc/source/data/api/input_output.rst index bb8d791d98b2..51bd7ecedb13 100644 --- a/doc/source/data/api/input_output.rst +++ b/doc/source/data/api/input_output.rst @@ -186,6 +186,15 @@ Delta Sharing read_delta_sharing_tables +Hudi +---- + +.. autosummary:: + :nosignatures: + :toctree: doc/ + + read_hudi + Iceberg ------- diff --git a/python/ray/data/BUILD b/python/ray/data/BUILD index d232ab352ba0..21b4e0d829b6 100644 --- a/python/ray/data/BUILD +++ b/python/ray/data/BUILD @@ -225,6 +225,14 @@ py_test( deps = ["//:ray_lib", ":conftest"], ) +py_test( + name = "test_hudi", + size = "small", + srcs = ["tests/test_hudi.py"], + tags = ["team:data", "exclusive"], + deps = ["//:ray_lib", ":conftest"], +) + py_test( name = "test_image", size = "small", diff --git a/python/ray/data/__init__.py b/python/ray/data/__init__.py index 89d531aa2ee5..5883ae6c542c 100644 --- a/python/ray/data/__init__.py +++ b/python/ray/data/__init__.py @@ -48,6 +48,7 @@ read_databricks_tables, read_datasource, read_delta_sharing_tables, + read_hudi, read_iceberg, read_images, read_json, @@ -139,6 +140,7 @@ "read_csv", "read_datasource", "read_delta_sharing_tables", + "read_hudi", "read_iceberg", "read_images", "read_json", diff --git a/python/ray/data/_internal/datasource/hudi_datasource.py b/python/ray/data/_internal/datasource/hudi_datasource.py new file mode 100644 index 000000000000..828d9baada7f --- /dev/null +++ b/python/ray/data/_internal/datasource/hudi_datasource.py @@ -0,0 +1,91 @@ +import logging +import os +from typing import Dict, Iterator, List, Optional + +from ray.data._internal.util import _check_import +from ray.data.block import BlockMetadata +from ray.data.datasource.datasource import Datasource, ReadTask + +logger = logging.getLogger(__name__) + + +class HudiDatasource(Datasource): + """Hudi datasource, for reading Apache Hudi table.""" + + def __init__( + self, + table_uri: str, + storage_options: Optional[Dict[str, str]] = None, + ): + _check_import(self, module="hudi", package="hudi-python") + + self._table_uri = table_uri + self._storage_options = storage_options + + def get_read_tasks(self, parallelism: int) -> List["ReadTask"]: + import pyarrow + from hudi import HudiTable + + def _perform_read( + table_uri: str, + base_file_paths: List[str], + options: Dict[str, str], + ) -> Iterator["pyarrow.Table"]: + from hudi import HudiFileGroupReader + + for p in base_file_paths: + file_group_reader = HudiFileGroupReader(table_uri, options) + batch = file_group_reader.read_file_slice_by_base_file_path(p) + yield pyarrow.Table.from_batches([batch]) + + hudi_table = HudiTable(self._table_uri, self._storage_options) + + reader_options = { + **hudi_table.storage_options(), + **hudi_table.hudi_options(), + } + + schema = hudi_table.get_schema() + read_tasks = [] + for file_slices_split in hudi_table.split_file_slices(parallelism): + if len(file_slices_split) == 0: + # when the table is empty, this will be an empty split + continue + + num_rows = 0 + relative_paths = [] + input_files = [] + size_bytes = 0 + for file_slice in file_slices_split: + # A file slice in a Hudi table is a logical group of data files + # within a physical partition. Records stored in a file slice + # are associated with a commit on the Hudi table's timeline. + # For more info, see https://hudi.apache.org/docs/file_layouts + num_rows += file_slice.num_records + relative_path = file_slice.base_file_relative_path() + relative_paths.append(relative_path) + full_path = os.path.join(self._table_uri, relative_path) + input_files.append(full_path) + size_bytes += file_slice.base_file_size + + metadata = BlockMetadata( + num_rows=num_rows, + schema=schema, + input_files=input_files, + size_bytes=size_bytes, + exec_stats=None, + ) + + read_task = ReadTask( + read_fn=lambda paths=relative_paths: _perform_read( + self._table_uri, paths, reader_options + ), + metadata=metadata, + ) + read_tasks.append(read_task) + + return read_tasks + + def estimate_inmemory_data_size(self) -> Optional[int]: + # TODO(xushiyan) add APIs to provide estimated in-memory size + return None diff --git a/python/ray/data/examples/data/hudi-tables/0.x_cow_partitioned.zip b/python/ray/data/examples/data/hudi-tables/0.x_cow_partitioned.zip new file mode 100644 index 000000000000..9f78c06de945 Binary files /dev/null and b/python/ray/data/examples/data/hudi-tables/0.x_cow_partitioned.zip differ diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 60eee8571c1d..d60a89858512 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -27,6 +27,7 @@ from ray.data._internal.datasource.delta_sharing_datasource import ( DeltaSharingDatasource, ) +from ray.data._internal.datasource.hudi_datasource import HudiDatasource from ray.data._internal.datasource.iceberg_datasource import IcebergDatasource from ray.data._internal.datasource.image_datasource import ( ImageDatasource, @@ -2312,6 +2313,58 @@ def get_dbutils(): ) +@PublicAPI(stability="alpha") +def read_hudi( + table_uri: str, + *, + storage_options: Optional[Dict[str, str]] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, +) -> Dataset: + """ + Create a :class:`~ray.data.Dataset` from an + `Apache Hudi table `_. + + Examples: + >>> import ray + >>> ds = ray.data.read_hudi( # doctest: +SKIP + ... table_uri="/hudi/trips", + ... ) + + Args: + table_uri: The URI of the Hudi table to read from. Local file paths, S3, and GCS + are supported. + storage_options: Extra options that make sense for a particular storage + connection. This is used to store connection parameters like credentials, + endpoint, etc. See more explanation + `here `_. + ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. + + Returns: + A :class:`~ray.data.Dataset` producing records read from the Hudi table. + """ # noqa: E501 + datasource = HudiDatasource( + table_uri=table_uri, + storage_options=storage_options, + ) + + return read_datasource( + datasource=datasource, + ray_remote_args=ray_remote_args, + concurrency=concurrency, + override_num_blocks=override_num_blocks, + ) + + @PublicAPI def from_dask(df: "dask.dataframe.DataFrame") -> MaterializedDataset: """Create a :class:`~ray.data.Dataset` from a diff --git a/python/ray/data/tests/test_hudi.py b/python/ray/data/tests/test_hudi.py new file mode 100644 index 000000000000..af8035cc315f --- /dev/null +++ b/python/ray/data/tests/test_hudi.py @@ -0,0 +1,114 @@ +import os +import zipfile + +import pytest +from packaging.version import parse as parse_version +from pytest_lazyfixture import lazy_fixture + +import ray +from ray._private.utils import _get_pyarrow_version +from ray.data.datasource.path_util import ( + _resolve_paths_and_filesystem, + _unwrap_protocol, +) +from ray.data.tests.conftest import * # noqa +from ray.data.tests.mock_http_server import * # noqa +from ray.tests.conftest import * # noqa + +MIN_PYARROW_VERSION_FOR_HUDI = parse_version("11.0.0") +_VER = _get_pyarrow_version() +PYARROW_VERSION = parse_version(_VER) if _VER else None +PYARROW_VERSION_MEETS_REQUIREMENT = ( + PYARROW_VERSION and PYARROW_VERSION >= MIN_PYARROW_VERSION_FOR_HUDI +) +PYARROW_HUDI_TEST_SKIP_REASON = ( + f"Hudi only supported if pyarrow >= {MIN_PYARROW_VERSION_FOR_HUDI}" +) + + +def _extract_testing_table(fixture_path: str, table_dir: str, target_dir: str) -> str: + with zipfile.ZipFile(fixture_path, "r") as zip_ref: + zip_ref.extractall(target_dir) + return os.path.join(target_dir, table_dir) + + +@pytest.mark.skipif( + not PYARROW_VERSION_MEETS_REQUIREMENT, + reason=PYARROW_HUDI_TEST_SKIP_REASON, +) +@pytest.mark.parametrize( + "fs,data_path", + [ + (None, lazy_fixture("local_path")), + (lazy_fixture("local_fs"), lazy_fixture("local_path")), + ], +) +def test_read_hudi_simple_cow_table(ray_start_regular_shared, fs, data_path): + setup_data_path = _unwrap_protocol(data_path) + target_testing_dir = os.path.join(setup_data_path, "test_hudi") + fixture_path, _ = _resolve_paths_and_filesystem( + "example://hudi-tables/0.x_cow_partitioned.zip", fs + ) + target_table_path = _extract_testing_table( + fixture_path[0], "trips_table", target_testing_dir + ) + + ds = ray.data.read_hudi(target_table_path) + + assert ds.schema().names == [ + "_hoodie_commit_time", + "_hoodie_commit_seqno", + "_hoodie_record_key", + "_hoodie_partition_path", + "_hoodie_file_name", + "ts", + "uuid", + "rider", + "driver", + "fare", + "city", + ] + assert ds.count() == 5 + rows = ( + ds.select_columns(["_hoodie_commit_time", "ts", "uuid", "fare"]) + .sort("fare") + .take_all() + ) + assert rows == [ + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695115999911, + "uuid": "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", + "fare": 17.85, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695159649087, + "uuid": "334e26e9-8355-45cc-97c6-c31daf0df330", + "fare": 19.1, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695091554788, + "uuid": "e96c4396-3fad-413a-a942-4cb36106d721", + "fare": 27.7, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695516137016, + "uuid": "e3cf430c-889d-4015-bc98-59bdce1e530c", + "fare": 34.15, + }, + { + "_hoodie_commit_time": "20240402144910683", + "ts": 1695046462179, + "uuid": "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", + "fare": 339.0, + }, + ] + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/requirements/ml/data-test-requirements.txt b/python/requirements/ml/data-test-requirements.txt index d2d435b09d88..9ad22340d031 100644 --- a/python/requirements/ml/data-test-requirements.txt +++ b/python/requirements/ml/data-test-requirements.txt @@ -18,4 +18,5 @@ delta-sharing pytest-mock decord snowflake-connector-python -pyiceberg[sql-sqlite]==0.7.0 \ No newline at end of file +pyiceberg[sql-sqlite]==0.7.0 +hudi==0.2.0rc1 diff --git a/python/requirements_compiled.txt b/python/requirements_compiled.txt index 1347afee24c5..45d40d81b7a0 100644 --- a/python/requirements_compiled.txt +++ b/python/requirements_compiled.txt @@ -745,6 +745,8 @@ httpx==0.24.1 # -r /ray/ci/../python/requirements/test-requirements.txt # gradio # gradio-client +hudi==0.2.0rc1 + # via -r /ray/ci/../python/requirements/ml/data-test-requirements.txt huggingface-hub==0.19.4 # via # accelerate