diff --git a/python/ray/data/_internal/execution/interfaces/ref_bundle.py b/python/ray/data/_internal/execution/interfaces/ref_bundle.py index c8996a1c422b..758b22215051 100644 --- a/python/ray/data/_internal/execution/interfaces/ref_bundle.py +++ b/python/ray/data/_internal/execution/interfaces/ref_bundle.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import List, Optional, Tuple +from typing import Iterator, List, Optional, Tuple import ray from .common import NodeIdStr @@ -59,7 +59,7 @@ def __setattr__(self, key, value): object.__setattr__(self, key, value) @property - def block_refs(self) -> List[BlockMetadata]: + def block_refs(self) -> List[ObjectRef[Block]]: """List of block references in this bundle.""" return [block_ref for block_ref, _ in self.blocks] @@ -125,3 +125,12 @@ def __hash__(self) -> int: def __len__(self) -> int: return len(self.blocks) + + +def _ref_bundles_iterator_to_block_refs_list( + ref_bundles: Iterator[RefBundle], +) -> List[ObjectRef[Block]]: + """Convert an iterator of RefBundles to a list of Block object references.""" + return [ + block_ref for ref_bundle in ref_bundles for block_ref in ref_bundle.block_refs + ] diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 0b93bc26cb4c..578613299ce3 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -44,6 +44,9 @@ from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder from ray.data._internal.equalize import _equalize from ray.data._internal.execution.interfaces import RefBundle +from ray.data._internal.execution.interfaces.ref_bundle import ( + _ref_bundles_iterator_to_block_refs_list, +) from ray.data._internal.iterator.iterator_impl import DataIteratorImpl from ray.data._internal.iterator.stream_split_iterator import StreamSplitDataIterator from ray.data._internal.logical.operators.all_to_all_operator import ( @@ -91,7 +94,7 @@ from ray.data.iterator import DataIterator from ray.data.random_access_dataset import RandomAccessDataset from ray.types import ObjectRef -from ray.util.annotations import DeveloperAPI, PublicAPI +from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy from ray.widgets import Template from ray.widgets.util import repr_with_fallback @@ -4166,14 +4169,14 @@ def to_dask( dask.config.set(scheduler=ray_dask_get) @dask.delayed - def block_to_df(block: Block): - if isinstance(block, (ray.ObjectRef, ClientObjectRef)): + def block_to_df(block_ref: ObjectRef[Block]) -> pd.DataFrame: + if isinstance(block_ref, (ray.ObjectRef, ClientObjectRef)): raise ValueError( "Dataset.to_dask() must be used with Dask-on-Ray, please " "set the Dask scheduler to ray_dask_get (located in " "ray.util.dask)." ) - return _block_to_df(block) + return _block_to_df(block_ref) if meta is None: from ray.data.extensions import TensorDtype @@ -4212,8 +4215,13 @@ def block_to_df(block: Block): else: meta = schema.empty_table().to_pandas() + dfs = [] + for ref_bundle in self.iter_internal_ref_bundles(): + for block_ref in ref_bundle.block_refs: + dfs.append(block_to_df(block_ref)) + ddf = dd.from_delayed( - [block_to_df(block) for block in self.get_internal_block_refs()], + dfs, meta=meta, verify_meta=verify_meta, ) @@ -4266,7 +4274,7 @@ def to_modin(self) -> "modin.pandas.dataframe.DataFrame": This is only supported for datasets convertible to Arrow records. This function induces a copy of the data. For zero-copy access to the underlying data, consider using :meth:`.to_arrow_refs` or - :meth:`.get_internal_block_refs`. + :meth:`.iter_internal_ref_bundles`. Time complexity: O(dataset size / parallelism) @@ -4299,9 +4307,10 @@ def to_spark(self, spark: "pyspark.sql.SparkSession") -> "pyspark.sql.DataFrame" schema = self.schema() if isinstance(schema, Schema): schema = schema.base_schema - return raydp.spark.ray_dataset_to_spark_dataframe( - spark, schema, self.get_internal_block_refs() - ) + + ref_bundles = self.iter_internal_ref_bundles() + block_refs = _ref_bundles_iterator_to_block_refs_list(ref_bundles) + return raydp.spark.ray_dataset_to_spark_dataframe(spark, schema, block_refs) @ConsumptionAPI(pattern="Time complexity:") def to_pandas(self, limit: int = None) -> "pandas.DataFrame": @@ -4343,10 +4352,12 @@ def to_pandas(self, limit: int = None) -> "pandas.DataFrame": f"{count} rows will fit in local memory, set " "ds.to_pandas(limit=None) to disable limits." ) - blocks = self.get_internal_block_refs() + bundles = self.iter_internal_ref_bundles() output = DelegatingBlockBuilder() - for block in blocks: - output.add_block(ray.get(block)) + + for bundle in bundles: + for block_ref in bundle.block_refs: + output.add_block(ray.get(block_ref)) block = output.build() return _block_to_df(block) @@ -4360,7 +4371,7 @@ def to_pandas_refs(self) -> List[ObjectRef["pandas.DataFrame"]]: This function induces a copy of the data. For zero-copy access to the underlying data, consider using :meth:`Dataset.to_arrow_refs` or - :meth:`Dataset.get_internal_block_refs`. + :meth:`Dataset.iter_internal_ref_bundles`. Examples: >>> import ray @@ -4376,7 +4387,11 @@ def to_pandas_refs(self) -> List[ObjectRef["pandas.DataFrame"]]: """ block_to_df = cached_remote_fn(_block_to_df) - return [block_to_df.remote(block) for block in self.get_internal_block_refs()] + pandas_refs = [] + for bundle in self.iter_internal_ref_bundles(): + for block_ref in bundle.block_refs: + pandas_refs.append(block_to_df.remote(block_ref)) + return pandas_refs @DeveloperAPI def to_numpy_refs( @@ -4388,7 +4403,7 @@ def to_numpy_refs( This is only supported for datasets convertible to NumPy ndarrays. This function induces a copy of the data. For zero-copy access to the underlying data, consider using :meth:`Dataset.to_arrow_refs` or - :meth:`Dataset.get_internal_block_refs`. + :meth:`Dataset.iter_internal_ref_bundles`. Examples: >>> import ray @@ -4408,10 +4423,11 @@ def to_numpy_refs( A list of remote NumPy ndarrays created from this dataset. """ block_to_ndarray = cached_remote_fn(_block_to_ndarray) - return [ - block_to_ndarray.remote(block, column=column) - for block in self.get_internal_block_refs() - ] + numpy_refs = [] + for bundle in self.iter_internal_ref_bundles(): + for block_ref in bundle.block_refs: + numpy_refs.append(block_to_ndarray.remote(block_ref, column=column)) + return numpy_refs @ConsumptionAPI(pattern="Time complexity:") @DeveloperAPI @@ -4439,18 +4455,21 @@ def to_arrow_refs(self) -> List[ObjectRef["pyarrow.Table"]]: """ import pyarrow as pa - blocks: List[ObjectRef["pyarrow.Table"]] = self.get_internal_block_refs() + ref_bundles: Iterator[RefBundle] = self.iter_internal_ref_bundles() + block_refs: List[ + ObjectRef["pyarrow.Table"] + ] = _ref_bundles_iterator_to_block_refs_list(ref_bundles) # Schema is safe to call since we have already triggered execution with - # get_internal_block_refs. + # iter_internal_ref_bundles. schema = self.schema(fetch_if_missing=True) if isinstance(schema, Schema): schema = schema.base_schema if isinstance(schema, pa.Schema): # Zero-copy path. - return blocks + return block_refs block_to_arrow = cached_remote_fn(_block_to_arrow) - return [block_to_arrow.remote(block) for block in blocks] + return [block_to_arrow.remote(block) for block in block_refs] @ConsumptionAPI(pattern="Args:") def to_random_access_dataset( @@ -4599,8 +4618,8 @@ def _build_ref_bundles( self._synchronize_progress_bar() return iter_ref_bundles + @Deprecated @ConsumptionAPI(pattern="Examples:") - @DeveloperAPI def get_internal_block_refs(self) -> List[ObjectRef[Block]]: """Get a list of references to the underlying blocks of this dataset. @@ -4616,8 +4635,10 @@ def get_internal_block_refs(self) -> List[ObjectRef[Block]]: Returns: A list of references to this dataset's blocks. """ - # TODO(scottjlee): replace get_internal_block_refs() usages with - # iter_internal_ref_bundles() + logger.warning( + "`Dataset.get_internal_block_refs()` is deprecated. Use " + "`Dataset.iter_internal_ref_bundles()` instead.", + ) block_refs = self._plan.execute().block_refs self._synchronize_progress_bar() return block_refs @@ -4937,13 +4958,19 @@ def __iter__(self): def _block_num_rows(self) -> List[int]: get_num_rows = cached_remote_fn(_get_num_rows) - return ray.get([get_num_rows.remote(b) for b in self.get_internal_block_refs()]) + num_rows = [] + for ref_bundle in self.iter_internal_ref_bundles(): + for block_ref in ref_bundle.block_refs: + num_rows.append(get_num_rows.remote(block_ref)) + return ray.get(num_rows) def _block_size_bytes(self) -> List[int]: get_size_bytes = cached_remote_fn(_get_size_bytes) - return ray.get( - [get_size_bytes.remote(b) for b in self.get_internal_block_refs()] - ) + size_bytes = [] + for ref_bundle in self.iter_internal_ref_bundles(): + for block_ref in ref_bundle.block_refs: + size_bytes.append(get_size_bytes.remote(block_ref)) + return ray.get(size_bytes) def _meta_count(self) -> Optional[int]: return self._plan.meta_count() @@ -5102,7 +5129,7 @@ def _get_size_bytes(block: Block) -> int: return block.size_bytes() -def _block_to_df(block: Block): +def _block_to_df(block: Block) -> "pandas.DataFrame": block = BlockAccessor.for_block(block) return block.to_pandas() diff --git a/python/ray/data/random_access_dataset.py b/python/ray/data/random_access_dataset.py index 06d3438a2e18..a24c6796f7ca 100644 --- a/python/ray/data/random_access_dataset.py +++ b/python/ray/data/random_access_dataset.py @@ -8,6 +8,9 @@ import numpy as np import ray +from ray.data._internal.execution.interfaces.ref_bundle import ( + _ref_bundles_iterator_to_block_refs_list, +) from ray.data._internal.remote_fn import cached_remote_fn from ray.data.block import BlockAccessor from ray.data.context import DataContext @@ -51,7 +54,8 @@ def __init__( logger.info("[setup] Indexing dataset by sort key.") sorted_ds = ds.sort(key) get_bounds = cached_remote_fn(_get_bounds) - blocks = sorted_ds.get_internal_block_refs() + bundles = sorted_ds.iter_internal_ref_bundles() + blocks = _ref_bundles_iterator_to_block_refs_list(bundles) logger.info("[setup] Computing block range bounds.") bounds = ray.get([get_bounds.remote(b, key) for b in blocks]) diff --git a/python/ray/data/tests/test_all_to_all.py b/python/ray/data/tests/test_all_to_all.py index 6b0f69a563ca..2b0a72175cf9 100644 --- a/python/ray/data/tests/test_all_to_all.py +++ b/python/ray/data/tests/test_all_to_all.py @@ -11,6 +11,9 @@ import ray from ray.data._internal.aggregate import Count, Max, Mean, Min, Quantile, Std, Sum +from ray.data._internal.execution.interfaces.ref_bundle import ( + _ref_bundles_iterator_to_block_refs_list, +) from ray.data.aggregate import AggregateFn from ray.data.context import DataContext from ray.data.tests.conftest import * # noqa @@ -1251,7 +1254,8 @@ def get_node_id(): node2_id = ray.get(get_node_id.options(resources={"bar:2": 1}).remote()) ds = ray.data.range(100, override_num_blocks=2).random_shuffle() - blocks = ds.get_internal_block_refs() + bundles = ds.iter_internal_ref_bundles() + blocks = _ref_bundles_iterator_to_block_refs_list(bundles) ray.wait(blocks, num_returns=len(blocks), fetch_local=False) location_data = ray.experimental.get_object_locations(blocks) locations = [] diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py index 4cef09afbc24..dc1fae49c35c 100644 --- a/python/ray/data/tests/test_consumption.py +++ b/python/ray/data/tests/test_consumption.py @@ -16,6 +16,9 @@ from ray.data._internal.datasource.csv_datasink import CSVDatasink from ray.data._internal.datasource.csv_datasource import CSVDatasource from ray.data._internal.datasource.range_datasource import RangeDatasource +from ray.data._internal.execution.interfaces.ref_bundle import ( + _ref_bundles_iterator_to_block_refs_list, +) from ray.data._internal.util import _check_pyarrow_version from ray.data.block import BlockAccessor, BlockMetadata from ray.data.context import DataContext @@ -694,7 +697,8 @@ def test_convert_types(ray_start_regular_shared): def test_from_blocks(input_blocks, ray_start_regular_shared): ds = ray.data.from_blocks(input_blocks) - output_blocks = [ray.get(block_ref) for block_ref in ds.get_internal_block_refs()] + bundles = ds.iter_internal_ref_bundles() + output_blocks = ray.get(_ref_bundles_iterator_to_block_refs_list(bundles)) assert len(input_blocks) == len(output_blocks) assert all( input_block.equals(output_block) @@ -1643,11 +1647,12 @@ def test_read_write_local_node(ray_start_cluster): ctx.read_write_local_node = True def check_dataset_is_local(ds): - blocks = ds.get_internal_block_refs() - ray.wait(blocks, num_returns=len(blocks), fetch_local=False) - location_data = ray.experimental.get_object_locations(blocks) + bundles = ds.iter_internal_ref_bundles() + block_refs = _ref_bundles_iterator_to_block_refs_list(bundles) + ray.wait(block_refs, num_returns=len(block_refs), fetch_local=False) + location_data = ray.experimental.get_object_locations(block_refs) locations = [] - for block in blocks: + for block in block_refs: locations.extend(location_data[block]["node_ids"]) assert set(locations) == {ray.get_runtime_context().get_node_id()} diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index 337bacc6f77d..18b36bfb8246 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -13,6 +13,9 @@ import pytest import ray +from ray.data._internal.execution.interfaces.ref_bundle import ( + _ref_bundles_iterator_to_block_refs_list, +) from ray.data.context import DataContext from ray.data.exceptions import UserCodeException from ray.data.tests.conftest import * # noqa @@ -944,7 +947,9 @@ def empty_pandas(batch): .map_batches(lambda x: x, batch_size=None) ) - block_refs = ds.get_internal_block_refs() + bundles = ds.iter_internal_ref_bundles() + block_refs = _ref_bundles_iterator_to_block_refs_list(bundles) + assert len(block_refs) == 1 assert type(ray.get(block_refs[0])) == pd.DataFrame diff --git a/python/ray/data/tests/test_pandas.py b/python/ray/data/tests/test_pandas.py index 626edc69f06d..d115805e8a08 100644 --- a/python/ray/data/tests/test_pandas.py +++ b/python/ray/data/tests/test_pandas.py @@ -1,13 +1,22 @@ +from typing import Iterator + import numpy as np import pandas as pd import pyarrow as pa import pytest import ray +from ray.data._internal.execution.interfaces.ref_bundle import RefBundle +from ray.data.block import Block from ray.data.extensions import ArrowTensorArray, ArrowTensorType, TensorDtype from ray.data.tests.conftest import * # noqa from ray.data.tests.mock_http_server import * # noqa from ray.tests.conftest import * # noqa +from ray.types import ObjectRef + + +def _get_first_block(bundles: Iterator[RefBundle]) -> ObjectRef[Block]: + return next(bundles).block_refs[0] @pytest.mark.parametrize("enable_pandas_block", [False, True]) @@ -19,7 +28,7 @@ def test_from_pandas(ray_start_regular_shared, enable_pandas_block): df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) ds = ray.data.from_pandas([df1, df2]) - block = ray.get(ds.get_internal_block_refs()[0]) + block = ray.get(_get_first_block(ds.iter_internal_ref_bundles())) assert ( isinstance(block, pd.DataFrame) if enable_pandas_block @@ -33,7 +42,7 @@ def test_from_pandas(ray_start_regular_shared, enable_pandas_block): # test from single pandas dataframe ds = ray.data.from_pandas(df1) - block = ray.get(ds.get_internal_block_refs()[0]) + block = ray.get(_get_first_block(ds.iter_internal_ref_bundles())) assert ( isinstance(block, pd.DataFrame) if enable_pandas_block @@ -66,7 +75,7 @@ def test_from_pandas_refs(ray_start_regular_shared, enable_pandas_block): df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) ds = ray.data.from_pandas_refs([ray.put(df1), ray.put(df2)]) - block = ray.get(ds.get_internal_block_refs()[0]) + block = ray.get(_get_first_block(ds.iter_internal_ref_bundles())) assert ( isinstance(block, pd.DataFrame) if enable_pandas_block @@ -80,7 +89,7 @@ def test_from_pandas_refs(ray_start_regular_shared, enable_pandas_block): # test from single pandas dataframe ref ds = ray.data.from_pandas_refs(ray.put(df1)) - block = ray.get(ds.get_internal_block_refs()[0]) + block = ray.get(_get_first_block(ds.iter_internal_ref_bundles())) assert ( isinstance(block, pd.DataFrame) if enable_pandas_block diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py index 9e92e2536b12..e3eeb8ee68e6 100644 --- a/python/ray/data/tests/test_parquet.py +++ b/python/ray/data/tests/test_parquet.py @@ -18,6 +18,9 @@ _deserialize_fragments_with_retry, _SerializedFragment, ) +from ray.data._internal.execution.interfaces.ref_bundle import ( + _ref_bundles_iterator_to_block_refs_list, +) from ray.data.block import BlockAccessor from ray.data.context import DataContext from ray.data.datasource import DefaultFileMetadataProvider, ParquetMetadataProvider @@ -1139,11 +1142,12 @@ def get_node_id(): ds = ray.data.read_parquet(data_path) # Force reads. - blocks = ds.get_internal_block_refs() - ray.wait(blocks, num_returns=len(blocks), fetch_local=False) - location_data = ray.experimental.get_object_locations(blocks) + bundles = ds.iter_internal_ref_bundles() + block_refs = _ref_bundles_iterator_to_block_refs_list(bundles) + ray.wait(block_refs, num_returns=len(block_refs), fetch_local=False) + location_data = ray.experimental.get_object_locations(block_refs) locations = [] - for block in blocks: + for block in block_refs: locations.extend(location_data[block]["node_ids"]) assert set(locations) == {node1_id, node2_id} diff --git a/python/ray/data/tests/test_size_estimation.py b/python/ray/data/tests/test_size_estimation.py index 8e24d7ea8cda..7a18b37e3c78 100644 --- a/python/ray/data/tests/test_size_estimation.py +++ b/python/ray/data/tests/test_size_estimation.py @@ -1,10 +1,12 @@ import os import uuid +from typing import Iterable import pytest import ray from ray.data._internal.arrow_block import ArrowBlockBuilder +from ray.data._internal.execution.interfaces.ref_bundle import RefBundle from ray.tests.conftest import * # noqa SMALL_VALUE = "a" * 100 @@ -187,17 +189,24 @@ def __call__(self, x): ctx.target_max_block_size = 20_000_000 ctx.target_max_block_size = 20_000_000 ds2 = ray.data.range(1000, override_num_blocks=1).map(arrow_fn, **kwargs) - nblocks = len(ds2.map(identity_fn, **kwargs).get_internal_block_refs()) + bundles: Iterable[RefBundle] = ds2.map( + identity_fn, **kwargs + ).iter_internal_ref_bundles() + nblocks = sum(len(b.block_refs) for b in bundles) assert nblocks == 1, nblocks ctx.target_max_block_size = 2_000_000 - nblocks = len(ds2.map(identity_fn, **kwargs).get_internal_block_refs()) + bundles: Iterable[RefBundle] = ds2.map( + identity_fn, **kwargs + ).iter_internal_ref_bundles() + nblocks = sum(len(b.block_refs) for b in bundles) assert 4 < nblocks < 7 or use_actors, nblocks # Disabled. # Setting a huge block size effectively disables block splitting. ctx.target_max_block_size = 2**64 ds3 = ray.data.range(1000, override_num_blocks=1).map(arrow_fn, **kwargs) - nblocks = len(ds3.map(identity_fn, **kwargs).get_internal_block_refs()) + bundles = ds3.map(identity_fn, **kwargs).iter_internal_ref_bundles() + nblocks = sum(len(b.block_refs) for b in bundles) assert nblocks == 1, nblocks @@ -207,10 +216,12 @@ def test_split_flat_map(ray_start_regular_shared): # Arrow block ctx.target_max_block_size = 20_000_000 ds2 = ray.data.range(1000, override_num_blocks=1).map(lambda _: ARROW_LARGE_VALUE) - nblocks = len(ds2.flat_map(lambda x: [x]).get_internal_block_refs()) + bundles = ds2.flat_map(lambda x: [x]).iter_internal_ref_bundles() + nblocks = sum(len(b.block_refs) for b in bundles) assert nblocks == 1, nblocks ctx.target_max_block_size = 2_000_000 - nblocks = len(ds2.flat_map(lambda x: [x]).get_internal_block_refs()) + bundles = ds2.flat_map(lambda x: [x]).iter_internal_ref_bundles() + nblocks = sum(len(b.block_refs) for b in bundles) assert 4 < nblocks < 7, nblocks @@ -220,10 +231,12 @@ def test_split_map_batches(ray_start_regular_shared): # Arrow block ctx.target_max_block_size = 20_000_000 ds2 = ray.data.range(1000, override_num_blocks=1).map(lambda _: ARROW_LARGE_VALUE) - nblocks = len(ds2.map_batches(lambda x: x, batch_size=1).get_internal_block_refs()) + bundles = ds2.map_batches(lambda x: x, batch_size=1).iter_internal_ref_bundles() + nblocks = sum(len(b.block_refs) for b in bundles) assert nblocks == 1, nblocks ctx.target_max_block_size = 2_000_000 - nblocks = len(ds2.map_batches(lambda x: x, batch_size=16).get_internal_block_refs()) + bundles = ds2.map_batches(lambda x: x, batch_size=16).iter_internal_ref_bundles() + nblocks = sum(len(b.block_refs) for b in bundles) assert 4 < nblocks < 7, nblocks diff --git a/python/ray/data/tests/test_split.py b/python/ray/data/tests/test_split.py index 2cd81cd2f6ae..1af6596fdb13 100644 --- a/python/ray/data/tests/test_split.py +++ b/python/ray/data/tests/test_split.py @@ -13,6 +13,9 @@ from ray.data._internal.block_list import BlockList from ray.data._internal.equalize import _equalize from ray.data._internal.execution.interfaces import RefBundle +from ray.data._internal.execution.interfaces.ref_bundle import ( + _ref_bundles_iterator_to_block_refs_list, +) from ray.data._internal.logical.interfaces import LogicalPlan from ray.data._internal.logical.operators.input_data_operator import InputData from ray.data._internal.plan import ExecutionPlan @@ -388,8 +391,9 @@ def assert_split_assignment(block_node_ids, actor_node_ids, expected_split_resul datasets[1] contains block 2. """ num_blocks = len(block_node_ids) - ds = ray.data.range(num_blocks, override_num_blocks=num_blocks) - blocks = ds.get_internal_block_refs() + ds = ray.data.range(num_blocks, override_num_blocks=num_blocks).materialize() + bundles = ds.iter_internal_ref_bundles() + blocks = _ref_bundles_iterator_to_block_refs_list(bundles) assert len(block_node_ids) == len(blocks) actors = [Actor.remote() for i in range(len(actor_node_ids))] with patch("ray.experimental.get_object_locations") as location_mock: @@ -412,7 +416,9 @@ def assert_split_assignment(block_node_ids, actor_node_ids, expected_split_resul assert len(datasets) == len(actors) for i in range(len(actors)): assert {blocks[j] for j in expected_split_result[i]} == set( - datasets[i].get_internal_block_refs() + _ref_bundles_iterator_to_block_refs_list( + datasets[i].iter_internal_ref_bundles() + ) ) assert_split_assignment( diff --git a/python/ray/data/tests/test_text.py b/python/ray/data/tests/test_text.py index 5a3239750eb9..fba2cc78d4e4 100644 --- a/python/ray/data/tests/test_text.py +++ b/python/ray/data/tests/test_text.py @@ -4,6 +4,9 @@ import pytest import ray +from ray.data._internal.execution.interfaces.ref_bundle import ( + _ref_bundles_iterator_to_block_refs_list, +) from ray.data.datasource import ( BaseFileMetadataProvider, FastFileMetadataProvider, @@ -194,11 +197,13 @@ def get_node_id(): path, override_num_blocks=2, ray_remote_args={"resources": {"bar": 1}} ) - blocks = ds.get_internal_block_refs() - ray.wait(blocks, num_returns=len(blocks), fetch_local=False) - location_data = ray.experimental.get_object_locations(blocks) + block_refs = _ref_bundles_iterator_to_block_refs_list( + ds.iter_internal_ref_bundles() + ) + ray.wait(block_refs, num_returns=len(block_refs), fetch_local=False) + location_data = ray.experimental.get_object_locations(block_refs) locations = [] - for block in blocks: + for block in block_refs: locations.extend(location_data[block]["node_ids"]) assert set(locations) == {bar_node_id}, locations assert sorted(_to_lines(ds.take())) == ["goodbye", "hello", "world"] diff --git a/python/ray/data/tests/test_transform_pyarrow.py b/python/ray/data/tests/test_transform_pyarrow.py index 366160c0ea4a..cfe53c9b98f5 100644 --- a/python/ray/data/tests/test_transform_pyarrow.py +++ b/python/ray/data/tests/test_transform_pyarrow.py @@ -444,7 +444,8 @@ def test_fallback_to_pandas_on_incompatible_data( # Ray Data will fall back to using Pandas. ds = _create_datasset(op, data) ds = ds.materialize() - block = ray.get(ds.get_internal_block_refs()[0]) + bundles = ds.iter_internal_ref_bundles() + block = ray.get(next(bundles).block_refs[0]) assert isinstance(block, pd.DataFrame) diff --git a/python/ray/data/tests/test_zip.py b/python/ray/data/tests/test_zip.py index 2df33ef8bb0e..85be862d0012 100644 --- a/python/ray/data/tests/test_zip.py +++ b/python/ray/data/tests/test_zip.py @@ -69,7 +69,8 @@ def test_zip_different_num_blocks_split_smallest( override_num_blocks=num_blocks2, ) ds = ds1.zip(ds2).materialize() - num_blocks = len(ds.get_internal_block_refs()) + bundles = ds.iter_internal_ref_bundles() + num_blocks = sum(len(b.block_refs) for b in bundles) assert ds.take() == [{str(i): i for i in range(num_cols1 + num_cols2)}] * n if should_invert: assert num_blocks == num_blocks2