Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Deprecate Dataset.get_internal_block_refs() #46455

Merged
merged 29 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c313a52
add iter_internal_block_refs
scottjlee Jul 1, 2024
09b905d
Merge branch 'master' into 0701-count
scottjlee Jul 2, 2024
5ef8041
return iterator over refbundles
scottjlee Jul 3, 2024
172e423
Merge branch '0701-count' of https://github.com/scottjlee/ray into 07…
scottjlee Jul 3, 2024
374898b
fix docs
scottjlee Jul 3, 2024
b0ec894
update consumption api usage
scottjlee Jul 3, 2024
f0b49f1
fix
scottjlee Jul 3, 2024
aa368d4
clean up
scottjlee Jul 3, 2024
ab8d6ed
comments
scottjlee Jul 3, 2024
128614d
comments
scottjlee Jul 3, 2024
65c3f4e
lint
scottjlee Jul 3, 2024
7956f35
Merge branch 'master' into 0701-count
scottjlee Jul 3, 2024
958d306
fix tests
scottjlee Jul 4, 2024
d6a1d79
Merge branch '0701-count' of https://github.com/scottjlee/ray into 07…
scottjlee Jul 4, 2024
87151ee
update tests
scottjlee Jul 5, 2024
6d15acd
replace get_internal_block_refs usages
scottjlee Jul 6, 2024
3e4d9b0
add deprecation warning
scottjlee Jul 8, 2024
3ea9759
Merge branch 'master' into 0701-count
scottjlee Jul 8, 2024
b6df226
snapshot metadata only
scottjlee Jul 8, 2024
70f82de
clean up
scottjlee Jul 8, 2024
97d177b
Merge branch '0701-count' into 0705-get_internal_block_Refs
scottjlee Jul 8, 2024
f520c62
update parquet test
scottjlee Jul 8, 2024
629d6bb
only cache metadata once iteration terminates
scottjlee Jul 9, 2024
1abdb63
Merge branch '0701-count' into 0705-get_internal_block_refs
scottjlee Jul 9, 2024
7c2710a
log deprecation warning
scottjlee Jul 9, 2024
52944e9
Merge branch 'master' into 0705-get_internal_block_refs
scottjlee Jul 10, 2024
a6e99c2
clean up
scottjlee Jul 10, 2024
fc0a334
comments
scottjlee Jul 11, 2024
615b03a
Merge branch 'master' into 0705-get_internal_block_refs
scottjlee Jul 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions python/ray/data/_internal/execution/interfaces/ref_bundle.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import List, Optional, Tuple
from typing import Iterator, List, Optional, Tuple

import ray
from .common import NodeIdStr
Expand Down Expand Up @@ -59,7 +59,7 @@ def __setattr__(self, key, value):
object.__setattr__(self, key, value)

@property
def block_refs(self) -> List[BlockMetadata]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix incorrect typehint

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

def block_refs(self) -> List[ObjectRef[Block]]:
"""List of block references in this bundle."""
return [block_ref for block_ref, _ in self.blocks]

Expand Down Expand Up @@ -125,3 +125,12 @@ def __hash__(self) -> int:

def __len__(self) -> int:
return len(self.blocks)


def _ref_bundles_iterator_to_block_refs_list(
ref_bundles: Iterator[RefBundle],
) -> List[ObjectRef[Block]]:
"""Convert an iterator of RefBundles to a list of object references to Blocks."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Change to "Convert an iterator of RefBundles to a list of Block object references." to avoid the double to, for a sec thought this was a double transformation.

return [
block_ref for ref_bundle in ref_bundles for block_ref in ref_bundle.block_refs
]
90 changes: 58 additions & 32 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.equalize import _equalize
from ray.data._internal.execution.interfaces import RefBundle
from ray.data._internal.execution.interfaces.ref_bundle import (
_ref_bundles_iterator_to_block_refs_list,
)
from ray.data._internal.iterator.iterator_impl import DataIteratorImpl
from ray.data._internal.iterator.stream_split_iterator import StreamSplitDataIterator
from ray.data._internal.logical.operators.all_to_all_operator import (
Expand Down Expand Up @@ -91,7 +94,7 @@
from ray.data.iterator import DataIterator
from ray.data.random_access_dataset import RandomAccessDataset
from ray.types import ObjectRef
from ray.util.annotations import DeveloperAPI, PublicAPI
from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
from ray.widgets import Template
from ray.widgets.util import repr_with_fallback
Expand Down Expand Up @@ -4166,14 +4169,14 @@ def to_dask(
dask.config.set(scheduler=ray_dask_get)

@dask.delayed
def block_to_df(block: Block):
if isinstance(block, (ray.ObjectRef, ClientObjectRef)):
def block_to_df(block_ref: Block) -> pd.DataFrame:
if isinstance(block_ref, (ray.ObjectRef, ClientObjectRef)):
raise ValueError(
"Dataset.to_dask() must be used with Dask-on-Ray, please "
"set the Dask scheduler to ray_dask_get (located in "
"ray.util.dask)."
)
return _block_to_df(block)
return _block_to_df(block_ref)

if meta is None:
from ray.data.extensions import TensorDtype
Expand Down Expand Up @@ -4212,8 +4215,13 @@ def block_to_df(block: Block):
else:
meta = schema.empty_table().to_pandas()

dfs = []
for ref_bundle in self.iter_internal_ref_bundles():
for block_ref in ref_bundle.block_refs:
dfs.append(block_to_df(block_ref))

ddf = dd.from_delayed(
[block_to_df(block) for block in self.get_internal_block_refs()],
dfs,
meta=meta,
verify_meta=verify_meta,
)
Expand Down Expand Up @@ -4266,7 +4274,7 @@ def to_modin(self) -> "modin.pandas.dataframe.DataFrame":
This is only supported for datasets convertible to Arrow records.
This function induces a copy of the data. For zero-copy access to the
underlying data, consider using :meth:`.to_arrow_refs` or
:meth:`.get_internal_block_refs`.
:meth:`.iter_internal_ref_bundles`.

Time complexity: O(dataset size / parallelism)

Expand Down Expand Up @@ -4299,9 +4307,10 @@ def to_spark(self, spark: "pyspark.sql.SparkSession") -> "pyspark.sql.DataFrame"
schema = self.schema()
if isinstance(schema, Schema):
schema = schema.base_schema
return raydp.spark.ray_dataset_to_spark_dataframe(
spark, schema, self.get_internal_block_refs()
)

ref_bundles = self.iter_internal_ref_bundles()
block_refs = _ref_bundles_iterator_to_block_refs_list(ref_bundles)
return raydp.spark.ray_dataset_to_spark_dataframe(spark, schema, block_refs)

@ConsumptionAPI(pattern="Time complexity:")
def to_pandas(self, limit: int = None) -> "pandas.DataFrame":
Expand Down Expand Up @@ -4343,10 +4352,12 @@ def to_pandas(self, limit: int = None) -> "pandas.DataFrame":
f"{count} rows will fit in local memory, set "
"ds.to_pandas(limit=None) to disable limits."
)
blocks = self.get_internal_block_refs()
bundles = self.iter_internal_ref_bundles()
output = DelegatingBlockBuilder()
for block in blocks:
output.add_block(ray.get(block))

for bundle in bundles:
for block_ref in bundle.block_refs:
output.add_block(ray.get(block_ref))
block = output.build()
return _block_to_df(block)

Expand All @@ -4360,7 +4371,7 @@ def to_pandas_refs(self) -> List[ObjectRef["pandas.DataFrame"]]:

This function induces a copy of the data. For zero-copy access to the
underlying data, consider using :meth:`Dataset.to_arrow_refs` or
:meth:`Dataset.get_internal_block_refs`.
:meth:`Dataset.iter_internal_ref_bundles`.

Examples:
>>> import ray
Expand All @@ -4376,7 +4387,11 @@ def to_pandas_refs(self) -> List[ObjectRef["pandas.DataFrame"]]:
"""

block_to_df = cached_remote_fn(_block_to_df)
return [block_to_df.remote(block) for block in self.get_internal_block_refs()]
pandas_refs = []
for bundle in self.iter_internal_ref_bundles():
for block_ref in bundle.block_refs:
pandas_refs.append(block_to_df.remote(block_ref))
return pandas_refs

@DeveloperAPI
def to_numpy_refs(
Expand All @@ -4388,7 +4403,7 @@ def to_numpy_refs(
This is only supported for datasets convertible to NumPy ndarrays.
This function induces a copy of the data. For zero-copy access to the
underlying data, consider using :meth:`Dataset.to_arrow_refs` or
:meth:`Dataset.get_internal_block_refs`.
:meth:`Dataset.iter_internal_ref_bundles`.

Examples:
>>> import ray
Expand All @@ -4408,10 +4423,11 @@ def to_numpy_refs(
A list of remote NumPy ndarrays created from this dataset.
"""
block_to_ndarray = cached_remote_fn(_block_to_ndarray)
return [
block_to_ndarray.remote(block, column=column)
for block in self.get_internal_block_refs()
]
numpy_refs = []
for bundle in self.iter_internal_ref_bundles():
for block_ref in bundle.block_refs:
numpy_refs.append(block_to_ndarray.remote(block_ref, column=column))
return numpy_refs

@ConsumptionAPI(pattern="Time complexity:")
@DeveloperAPI
Expand Down Expand Up @@ -4439,18 +4455,21 @@ def to_arrow_refs(self) -> List[ObjectRef["pyarrow.Table"]]:
"""
import pyarrow as pa

blocks: List[ObjectRef["pyarrow.Table"]] = self.get_internal_block_refs()
ref_bundles: Iterator[RefBundle] = self.iter_internal_ref_bundles()
block_refs: List[
ObjectRef["pyarrow.Table"]
] = _ref_bundles_iterator_to_block_refs_list(ref_bundles)
# Schema is safe to call since we have already triggered execution with
# get_internal_block_refs.
# iter_internal_ref_bundles.
schema = self.schema(fetch_if_missing=True)
if isinstance(schema, Schema):
schema = schema.base_schema
if isinstance(schema, pa.Schema):
# Zero-copy path.
return blocks
return block_refs

block_to_arrow = cached_remote_fn(_block_to_arrow)
return [block_to_arrow.remote(block) for block in blocks]
return [block_to_arrow.remote(block) for block in block_refs]

@ConsumptionAPI(pattern="Args:")
def to_random_access_dataset(
Expand Down Expand Up @@ -4599,8 +4618,7 @@ def _build_ref_bundles(
self._synchronize_progress_bar()
return iter_ref_bundles

@ConsumptionAPI(pattern="Examples:")
@DeveloperAPI
@Deprecated
def get_internal_block_refs(self) -> List[ObjectRef[Block]]:
"""Get a list of references to the underlying blocks of this dataset.

Expand All @@ -4616,8 +4634,10 @@ def get_internal_block_refs(self) -> List[ObjectRef[Block]]:
Returns:
A list of references to this dataset's blocks.
"""
# TODO(scottjlee): replace get_internal_block_refs() usages with
# iter_internal_ref_bundles()
logger.warning(
"`Dataset.get_internal_block_refs()` is deprecated. Use "
"`Dataset.iter_internal_ref_bundles()` instead.",
)
block_refs = self._plan.execute().block_refs
self._synchronize_progress_bar()
return block_refs
Expand Down Expand Up @@ -4937,13 +4957,19 @@ def __iter__(self):

def _block_num_rows(self) -> List[int]:
get_num_rows = cached_remote_fn(_get_num_rows)
return ray.get([get_num_rows.remote(b) for b in self.get_internal_block_refs()])
num_rows = []
for ref_bundle in self.iter_internal_ref_bundles():
for block_ref in ref_bundle.block_refs:
num_rows.append(get_num_rows.remote(block_ref))
return ray.get(num_rows)

def _block_size_bytes(self) -> List[int]:
get_size_bytes = cached_remote_fn(_get_size_bytes)
return ray.get(
[get_size_bytes.remote(b) for b in self.get_internal_block_refs()]
)
size_bytes = []
for ref_bundle in self.iter_internal_ref_bundles():
for block_ref in ref_bundle.block_refs:
size_bytes.append(get_size_bytes.remote(block_ref))
return ray.get(size_bytes)

def _meta_count(self) -> Optional[int]:
return self._plan.meta_count()
Expand Down Expand Up @@ -5102,7 +5128,7 @@ def _get_size_bytes(block: Block) -> int:
return block.size_bytes()


def _block_to_df(block: Block):
def _block_to_df(block: Block) -> "pandas.DataFrame":
block = BlockAccessor.for_block(block)
return block.to_pandas()

Expand Down
6 changes: 5 additions & 1 deletion python/ray/data/random_access_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import numpy as np

import ray
from ray.data._internal.execution.interfaces.ref_bundle import (
_ref_bundles_iterator_to_block_refs_list,
)
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data.block import BlockAccessor
from ray.data.context import DataContext
Expand Down Expand Up @@ -51,7 +54,8 @@ def __init__(
logger.info("[setup] Indexing dataset by sort key.")
sorted_ds = ds.sort(key)
get_bounds = cached_remote_fn(_get_bounds)
blocks = sorted_ds.get_internal_block_refs()
bundles = sorted_ds.iter_internal_ref_bundles()
blocks = _ref_bundles_iterator_to_block_refs_list(bundles)

logger.info("[setup] Computing block range bounds.")
bounds = ray.get([get_bounds.remote(b, key) for b in blocks])
Expand Down
6 changes: 5 additions & 1 deletion python/ray/data/tests/test_all_to_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

import ray
from ray.data._internal.aggregate import Count, Max, Mean, Min, Quantile, Std, Sum
from ray.data._internal.execution.interfaces.ref_bundle import (
_ref_bundles_iterator_to_block_refs_list,
)
from ray.data.aggregate import AggregateFn
from ray.data.context import DataContext
from ray.data.tests.conftest import * # noqa
Expand Down Expand Up @@ -1251,7 +1254,8 @@ def get_node_id():
node2_id = ray.get(get_node_id.options(resources={"bar:2": 1}).remote())

ds = ray.data.range(100, override_num_blocks=2).random_shuffle()
blocks = ds.get_internal_block_refs()
bundles = ds.iter_internal_ref_bundles()
blocks = _ref_bundles_iterator_to_block_refs_list(bundles)
ray.wait(blocks, num_returns=len(blocks), fetch_local=False)
location_data = ray.experimental.get_object_locations(blocks)
locations = []
Expand Down
15 changes: 10 additions & 5 deletions python/ray/data/tests/test_consumption.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from ray.data._internal.datasource.csv_datasink import CSVDatasink
from ray.data._internal.datasource.csv_datasource import CSVDatasource
from ray.data._internal.datasource.range_datasource import RangeDatasource
from ray.data._internal.execution.interfaces.ref_bundle import (
_ref_bundles_iterator_to_block_refs_list,
)
from ray.data._internal.util import _check_pyarrow_version
from ray.data.block import BlockAccessor, BlockMetadata
from ray.data.context import DataContext
Expand Down Expand Up @@ -694,7 +697,8 @@ def test_convert_types(ray_start_regular_shared):
def test_from_blocks(input_blocks, ray_start_regular_shared):
ds = ray.data.from_blocks(input_blocks)

output_blocks = [ray.get(block_ref) for block_ref in ds.get_internal_block_refs()]
bundles = ds.iter_internal_ref_bundles()
output_blocks = ray.get(_ref_bundles_iterator_to_block_refs_list(bundles))
assert len(input_blocks) == len(output_blocks)
assert all(
input_block.equals(output_block)
Expand Down Expand Up @@ -1643,11 +1647,12 @@ def test_read_write_local_node(ray_start_cluster):
ctx.read_write_local_node = True

def check_dataset_is_local(ds):
blocks = ds.get_internal_block_refs()
ray.wait(blocks, num_returns=len(blocks), fetch_local=False)
location_data = ray.experimental.get_object_locations(blocks)
bundles = ds.iter_internal_ref_bundles()
block_refs = _ref_bundles_iterator_to_block_refs_list(bundles)
ray.wait(block_refs, num_returns=len(block_refs), fetch_local=False)
location_data = ray.experimental.get_object_locations(block_refs)
locations = []
for block in blocks:
for block in block_refs:
locations.extend(location_data[block]["node_ids"])
assert set(locations) == {ray.get_runtime_context().get_node_id()}

Expand Down
7 changes: 6 additions & 1 deletion python/ray/data/tests/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import pytest

import ray
from ray.data._internal.execution.interfaces.ref_bundle import (
_ref_bundles_iterator_to_block_refs_list,
)
from ray.data.context import DataContext
from ray.data.exceptions import UserCodeException
from ray.data.tests.conftest import * # noqa
Expand Down Expand Up @@ -944,7 +947,9 @@ def empty_pandas(batch):
.map_batches(lambda x: x, batch_size=None)
)

block_refs = ds.get_internal_block_refs()
bundles = ds.iter_internal_ref_bundles()
block_refs = _ref_bundles_iterator_to_block_refs_list(bundles)

assert len(block_refs) == 1
assert type(ray.get(block_refs[0])) == pd.DataFrame

Expand Down
Loading