Skip to content

Commit

Permalink
[data][api] implement HudiDataSource (#46273)
Browse files Browse the repository at this point in the history
Support read from Hudi table into Ray dataset.

---------

Signed-off-by: Shiyan Xu <2701446+xushiyan@users.noreply.github.com>
  • Loading branch information
xushiyan authored Nov 19, 2024
1 parent e70b37a commit 36682c1
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 1 deletion.
1 change: 1 addition & 0 deletions .vale/styles/config/vocabularies/Data/accept.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Data('s)?
[Dd]iscretizer(s)?
dtype
[Gg]roupby
[Hh]udi
[Ii]ndexable
[Ii]ngest
[Ii]nqueue(s)?
Expand Down
9 changes: 9 additions & 0 deletions doc/source/data/api/input_output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ Delta Sharing

read_delta_sharing_tables

Hudi
----

.. autosummary::
:nosignatures:
:toctree: doc/

read_hudi

Iceberg
-------

Expand Down
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_hudi",
size = "small",
srcs = ["tests/test_hudi.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_image",
size = "small",
Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
read_databricks_tables,
read_datasource,
read_delta_sharing_tables,
read_hudi,
read_iceberg,
read_images,
read_json,
Expand Down Expand Up @@ -139,6 +140,7 @@
"read_csv",
"read_datasource",
"read_delta_sharing_tables",
"read_hudi",
"read_iceberg",
"read_images",
"read_json",
Expand Down
91 changes: 91 additions & 0 deletions python/ray/data/_internal/datasource/hudi_datasource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import logging
import os
from typing import Dict, Iterator, List, Optional

from ray.data._internal.util import _check_import
from ray.data.block import BlockMetadata
from ray.data.datasource.datasource import Datasource, ReadTask

logger = logging.getLogger(__name__)


class HudiDatasource(Datasource):
"""Hudi datasource, for reading Apache Hudi table."""

def __init__(
self,
table_uri: str,
storage_options: Optional[Dict[str, str]] = None,
):
_check_import(self, module="hudi", package="hudi-python")

self._table_uri = table_uri
self._storage_options = storage_options

def get_read_tasks(self, parallelism: int) -> List["ReadTask"]:
import pyarrow
from hudi import HudiTable

def _perform_read(
table_uri: str,
base_file_paths: List[str],
options: Dict[str, str],
) -> Iterator["pyarrow.Table"]:
from hudi import HudiFileGroupReader

for p in base_file_paths:
file_group_reader = HudiFileGroupReader(table_uri, options)
batch = file_group_reader.read_file_slice_by_base_file_path(p)
yield pyarrow.Table.from_batches([batch])

hudi_table = HudiTable(self._table_uri, self._storage_options)

reader_options = {
**hudi_table.storage_options(),
**hudi_table.hudi_options(),
}

schema = hudi_table.get_schema()
read_tasks = []
for file_slices_split in hudi_table.split_file_slices(parallelism):
if len(file_slices_split) == 0:
# when the table is empty, this will be an empty split
continue

num_rows = 0
relative_paths = []
input_files = []
size_bytes = 0
for file_slice in file_slices_split:
# A file slice in a Hudi table is a logical group of data files
# within a physical partition. Records stored in a file slice
# are associated with a commit on the Hudi table's timeline.
# For more info, see https://hudi.apache.org/docs/file_layouts
num_rows += file_slice.num_records
relative_path = file_slice.base_file_relative_path()
relative_paths.append(relative_path)
full_path = os.path.join(self._table_uri, relative_path)
input_files.append(full_path)
size_bytes += file_slice.base_file_size

metadata = BlockMetadata(
num_rows=num_rows,
schema=schema,
input_files=input_files,
size_bytes=size_bytes,
exec_stats=None,
)

read_task = ReadTask(
read_fn=lambda paths=relative_paths: _perform_read(
self._table_uri, paths, reader_options
),
metadata=metadata,
)
read_tasks.append(read_task)

return read_tasks

def estimate_inmemory_data_size(self) -> Optional[int]:
# TODO(xushiyan) add APIs to provide estimated in-memory size
return None
Binary file not shown.
53 changes: 53 additions & 0 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from ray.data._internal.datasource.delta_sharing_datasource import (
DeltaSharingDatasource,
)
from ray.data._internal.datasource.hudi_datasource import HudiDatasource
from ray.data._internal.datasource.iceberg_datasource import IcebergDatasource
from ray.data._internal.datasource.image_datasource import (
ImageDatasource,
Expand Down Expand Up @@ -2312,6 +2313,58 @@ def get_dbutils():
)


@PublicAPI(stability="alpha")
def read_hudi(
table_uri: str,
*,
storage_options: Optional[Dict[str, str]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
) -> Dataset:
"""
Create a :class:`~ray.data.Dataset` from an
`Apache Hudi table <https://hudi.apache.org>`_.
Examples:
>>> import ray
>>> ds = ray.data.read_hudi( # doctest: +SKIP
... table_uri="/hudi/trips",
... )
Args:
table_uri: The URI of the Hudi table to read from. Local file paths, S3, and GCS
are supported.
storage_options: Extra options that make sense for a particular storage
connection. This is used to store connection parameters like credentials,
endpoint, etc. See more explanation
`here <https://github.com/apache/hudi-rs?tab=readme-ov-file#work-with-cloud-storage>`_.
ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks.
concurrency: The maximum number of Ray tasks to run concurrently. Set this
to control number of tasks to run concurrently. This doesn't change the
total number of tasks run or the total number of output blocks. By default,
concurrency is dynamically decided based on the available resources.
override_num_blocks: Override the number of output blocks from all read tasks.
By default, the number of output blocks is dynamically decided based on
input data size and available resources. You shouldn't manually set this
value in most cases.
Returns:
A :class:`~ray.data.Dataset` producing records read from the Hudi table.
""" # noqa: E501
datasource = HudiDatasource(
table_uri=table_uri,
storage_options=storage_options,
)

return read_datasource(
datasource=datasource,
ray_remote_args=ray_remote_args,
concurrency=concurrency,
override_num_blocks=override_num_blocks,
)


@PublicAPI
def from_dask(df: "dask.dataframe.DataFrame") -> MaterializedDataset:
"""Create a :class:`~ray.data.Dataset` from a
Expand Down
114 changes: 114 additions & 0 deletions python/ray/data/tests/test_hudi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import os
import zipfile

import pytest
from packaging.version import parse as parse_version
from pytest_lazyfixture import lazy_fixture

import ray
from ray._private.utils import _get_pyarrow_version
from ray.data.datasource.path_util import (
_resolve_paths_and_filesystem,
_unwrap_protocol,
)
from ray.data.tests.conftest import * # noqa
from ray.data.tests.mock_http_server import * # noqa
from ray.tests.conftest import * # noqa

MIN_PYARROW_VERSION_FOR_HUDI = parse_version("11.0.0")
_VER = _get_pyarrow_version()
PYARROW_VERSION = parse_version(_VER) if _VER else None
PYARROW_VERSION_MEETS_REQUIREMENT = (
PYARROW_VERSION and PYARROW_VERSION >= MIN_PYARROW_VERSION_FOR_HUDI
)
PYARROW_HUDI_TEST_SKIP_REASON = (
f"Hudi only supported if pyarrow >= {MIN_PYARROW_VERSION_FOR_HUDI}"
)


def _extract_testing_table(fixture_path: str, table_dir: str, target_dir: str) -> str:
with zipfile.ZipFile(fixture_path, "r") as zip_ref:
zip_ref.extractall(target_dir)
return os.path.join(target_dir, table_dir)


@pytest.mark.skipif(
not PYARROW_VERSION_MEETS_REQUIREMENT,
reason=PYARROW_HUDI_TEST_SKIP_REASON,
)
@pytest.mark.parametrize(
"fs,data_path",
[
(None, lazy_fixture("local_path")),
(lazy_fixture("local_fs"), lazy_fixture("local_path")),
],
)
def test_read_hudi_simple_cow_table(ray_start_regular_shared, fs, data_path):
setup_data_path = _unwrap_protocol(data_path)
target_testing_dir = os.path.join(setup_data_path, "test_hudi")
fixture_path, _ = _resolve_paths_and_filesystem(
"example://hudi-tables/0.x_cow_partitioned.zip", fs
)
target_table_path = _extract_testing_table(
fixture_path[0], "trips_table", target_testing_dir
)

ds = ray.data.read_hudi(target_table_path)

assert ds.schema().names == [
"_hoodie_commit_time",
"_hoodie_commit_seqno",
"_hoodie_record_key",
"_hoodie_partition_path",
"_hoodie_file_name",
"ts",
"uuid",
"rider",
"driver",
"fare",
"city",
]
assert ds.count() == 5
rows = (
ds.select_columns(["_hoodie_commit_time", "ts", "uuid", "fare"])
.sort("fare")
.take_all()
)
assert rows == [
{
"_hoodie_commit_time": "20240402123035233",
"ts": 1695115999911,
"uuid": "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa",
"fare": 17.85,
},
{
"_hoodie_commit_time": "20240402123035233",
"ts": 1695159649087,
"uuid": "334e26e9-8355-45cc-97c6-c31daf0df330",
"fare": 19.1,
},
{
"_hoodie_commit_time": "20240402123035233",
"ts": 1695091554788,
"uuid": "e96c4396-3fad-413a-a942-4cb36106d721",
"fare": 27.7,
},
{
"_hoodie_commit_time": "20240402123035233",
"ts": 1695516137016,
"uuid": "e3cf430c-889d-4015-bc98-59bdce1e530c",
"fare": 34.15,
},
{
"_hoodie_commit_time": "20240402144910683",
"ts": 1695046462179,
"uuid": "9909a8b1-2d15-4d3d-8ec9-efc48c536a00",
"fare": 339.0,
},
]


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))
3 changes: 2 additions & 1 deletion python/requirements/ml/data-test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ delta-sharing
pytest-mock
decord
snowflake-connector-python
pyiceberg[sql-sqlite]==0.7.0
pyiceberg[sql-sqlite]==0.7.0
hudi==0.2.0rc1
2 changes: 2 additions & 0 deletions python/requirements_compiled.txt
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,8 @@ httpx==0.24.1
# -r /ray/ci/../python/requirements/test-requirements.txt
# gradio
# gradio-client
hudi==0.2.0rc1
# via -r /ray/ci/../python/requirements/ml/data-test-requirements.txt
huggingface-hub==0.19.4
# via
# accelerate
Expand Down

0 comments on commit 36682c1

Please sign in to comment.