Skip to content

Commit

Permalink
[data][api] implement HudiDataSource
Browse files Browse the repository at this point in the history
@MicroCheck //python:ray/data/tests/test_hudi

Signed-off-by: Shiyan Xu <2701446+xushiyan@users.noreply.github.com>
  • Loading branch information
xushiyan committed Oct 20, 2024
1 parent 8c28fe2 commit f63fd83
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 1 deletion.
1 change: 1 addition & 0 deletions .vale/styles/config/vocabularies/Data/accept.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Data('s)?
[Dd]iscretizer(s)?
dtype
[Gg]roupby
[Hh]udi
[Ii]ndexable
[Ii]ngest
[Ii]nqueue(s)?
Expand Down
9 changes: 9 additions & 0 deletions doc/source/data/api/input_output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ Delta Sharing

read_delta_sharing_tables

Hudi
----

.. autosummary::
:nosignatures:
:toctree: doc/

read_hudi

Iceberg
-------

Expand Down
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_hudi",
size = "small",
srcs = ["tests/test_hudi.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_image",
size = "small",
Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
read_databricks_tables,
read_datasource,
read_delta_sharing_tables,
read_hudi,
read_iceberg,
read_images,
read_json,
Expand Down Expand Up @@ -140,6 +141,7 @@
"read_csv",
"read_datasource",
"read_delta_sharing_tables",
"read_hudi",
"read_iceberg",
"read_images",
"read_json",
Expand Down
87 changes: 87 additions & 0 deletions python/ray/data/_internal/datasource/hudi_datasource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import logging
from functools import partial
from pathlib import PurePosixPath
from typing import Dict, Iterator, List, Optional
from urllib.parse import urljoin

from ray.data._internal.util import _check_import
from ray.data.block import BlockMetadata
from ray.data.datasource.datasource import Datasource, ReadTask

logger = logging.getLogger(__name__)


class HudiDatasource(Datasource):
"""Hudi datasource, for reading Apache Hudi table."""

def __init__(
self,
table_uri: str,
storage_options: Optional[Dict[str, str]] = None,
):
_check_import(self, module="hudi", package="hudi-python")

self._table_uri = table_uri
self._storage_options = storage_options

def get_read_tasks(self, parallelism: int) -> List["ReadTask"]:
import pyarrow
from hudi import HudiFileGroupReader, HudiTable

def _perform_read(
table_uri: str,
base_file_paths: List[str],
options: Dict[str, str],
) -> Iterator["pyarrow.Table"]:
for p in base_file_paths:
fg_reader = HudiFileGroupReader(table_uri, options)
batches = fg_reader.read_file_slice_by_base_file_path(p)
yield pyarrow.Table.from_batches(batches)

hudi_table = HudiTable(self._table_uri, self._storage_options)

reader_options = {}
reader_options.update(hudi_table.storage_options())
reader_options.update(hudi_table.hudi_options())

schema = hudi_table.get_schema()
read_tasks = []
for file_slices in hudi_table.split_file_slices(parallelism):
if len(file_slices) <= 0:
continue

num_rows = 0
relative_paths = []
input_files = []
size_bytes = 0
for f in file_slices:
num_rows += f.num_records
relative_path = f.base_file_relative_path()
relative_paths.append(relative_path)
full_path = urljoin(
self._table_uri, PurePosixPath(relative_path).as_posix()
)
input_files.append(full_path)
size_bytes += f.base_file_size

metadata = BlockMetadata(
num_rows=num_rows,
schema=schema,
input_files=input_files,
size_bytes=size_bytes,
exec_stats=None,
)

read_task = ReadTask(
read_fn=partial(
_perform_read, self._table_uri, relative_paths, reader_options
),
metadata=metadata,
)
read_tasks.append(read_task)

return read_tasks

def estimate_inmemory_data_size(self) -> Optional[int]:
# TODO(xushiyan) add APIs to provide estimated in-memory size
return None
Binary file not shown.
52 changes: 52 additions & 0 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from ray.data._internal.datasource.delta_sharing_datasource import (
DeltaSharingDatasource,
)
from ray.data._internal.datasource.hudi_datasource import HudiDatasource
from ray.data._internal.datasource.iceberg_datasource import IcebergDatasource
from ray.data._internal.datasource.image_datasource import (
ImageDatasource,
Expand Down Expand Up @@ -2278,6 +2279,57 @@ def get_dbutils():
)


@PublicAPI(stability="alpha")
def read_hudi(
table_uri: str,
*,
storage_options: Optional[Dict[str, str]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
) -> Dataset:
"""
Create a :class:`~ray.data.Dataset` from an
`Apache Hudi table <https://hudi.apache.org>`_.
Examples:
>>> import ray
>>> ds = ray.data.read_hudi( # doctest: +SKIP
... table_uri="/hudi/trips",
... )
Args:
table_uri: The URI of the Hudi table to read from. Local file paths, S3, and GCS
are supported.
storage_options: Extra options that make sense for a particular storage
connection. This is used to store connection parameters like credentials,
endpoint, etc.
ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks.
concurrency: The maximum number of Ray tasks to run concurrently. Set this
to control number of tasks to run concurrently. This doesn't change the
total number of tasks run or the total number of output blocks. By default,
concurrency is dynamically decided based on the available resources.
override_num_blocks: Override the number of output blocks from all read tasks.
By default, the number of output blocks is dynamically decided based on
input data size and available resources. You shouldn't manually set this
value in most cases.
Returns:
A :class:`~ray.data.Dataset` producing records read from the Hudi table.
""" # noqa: E501
datasource = HudiDatasource(
table_uri=table_uri,
storage_options=storage_options,
)

return read_datasource(
datasource=datasource,
ray_remote_args=ray_remote_args,
concurrency=concurrency,
override_num_blocks=override_num_blocks,
)


@PublicAPI
def from_dask(df: "dask.dataframe.DataFrame") -> MaterializedDataset:
"""Create a :class:`~ray.data.Dataset` from a
Expand Down
108 changes: 108 additions & 0 deletions python/ray/data/tests/test_hudi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import os
import zipfile

import pyarrow as pa
import pytest
from pytest_lazyfixture import lazy_fixture

import ray
from ray.data.datasource.path_util import (
_resolve_paths_and_filesystem,
_unwrap_protocol,
)
from ray.data.tests.conftest import * # noqa
from ray.data.tests.mock_http_server import * # noqa
from ray.tests.conftest import * # noqa

PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < (
8,
0,
0,
)
pytestmark = pytest.mark.skipif(
PYARROW_LE_8_0_0, reason="hudi only supported if pyarrow >= 8.0.0"
)


def _extract_testing_table(fixture_path: str, table_dir: str, target_dir: str) -> str:
with zipfile.ZipFile(fixture_path, "r") as zip_ref:
zip_ref.extractall(target_dir)
return os.path.join(target_dir, table_dir)


@pytest.mark.parametrize(
"fs,data_path",
[
(None, lazy_fixture("local_path")),
(lazy_fixture("local_fs"), lazy_fixture("local_path")),
],
)
def test_read_hudi_simple_cow_table(ray_start_regular_shared, fs, data_path):
setup_data_path = _unwrap_protocol(data_path)
target_testing_dir = os.path.join(setup_data_path, "test_hudi")
fixture_path, _ = _resolve_paths_and_filesystem(
"example://hudi-tables/0.x_cow_partitioned.zip", fs
)
target_table_path = _extract_testing_table(
fixture_path[0], "trips_table", target_testing_dir
)

ds = ray.data.read_hudi(target_table_path)

assert ds.schema().names == [
"_hoodie_commit_time",
"_hoodie_commit_seqno",
"_hoodie_record_key",
"_hoodie_partition_path",
"_hoodie_file_name",
"ts",
"uuid",
"rider",
"driver",
"fare",
"city",
]
assert ds.count() == 5
rows = (
ds.select_columns(["_hoodie_commit_time", "ts", "uuid", "fare"])
.sort("ts")
.take_all()
)
assert rows == [
[
"20240402123035233",
1695046462179,
"9909a8b1-2d15-4d3d-8ec9-efc48c536a00",
33.9,
],
[
"20240402123035233",
1695091554788,
"e96c4396-3fad-413a-a942-4cb36106d721",
27.7,
],
[
"20240402123035233",
1695115999911,
"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa",
17.85,
],
[
"20240402123035233",
1695159649087,
"334e26e9-8355-45cc-97c6-c31daf0df330",
19.1,
],
[
"20240402123035233",
1695516137016,
"e3cf430c-889d-4015-bc98-59bdce1e530c",
34.15,
],
]


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))
3 changes: 2 additions & 1 deletion python/requirements/ml/data-test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ delta-sharing
pytest-mock
decord
snowflake-connector-python
pyiceberg[sql-sqlite]==0.7.0
pyiceberg[sql-sqlite]==0.7.0
hudi @ git+https://github.com/xushiyan/hudi-rs.git@c76553b9ba0fd7c0c7860621de97939af95587fc#egg=hudi&subdirectory=python
2 changes: 2 additions & 0 deletions python/requirements_compiled.txt
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,8 @@ httpx==0.24.1
# -r /ray/ci/../python/requirements/test-requirements.txt
# gradio
# gradio-client
hudi @ git+https://github.com/xushiyan/hudi-rs.git@c76553b9ba0fd7c0c7860621de97939af95587fc#egg=hudi&subdirectory=python
# via -r /ray/ci/../python/requirements/ml/data-test-requirements.txt
huggingface-hub==0.19.4
# via
# accelerate
Expand Down

0 comments on commit f63fd83

Please sign in to comment.