From c313a520e20b0bfeab65d83c0154fee0d6a0ca58 Mon Sep 17 00:00:00 2001 From: sjl Date: Mon, 1 Jul 2024 23:23:41 +0000 Subject: [PATCH 01/20] add iter_internal_block_refs Signed-off-by: sjl --- python/ray/data/dataset.py | 38 +++++++++++++++++++++------ python/ray/data/tests/test_formats.py | 17 ++++++++++++ 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 54ee2b6d62c4..99e74e2d51a1 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -12,6 +12,7 @@ Dict, Generic, Iterable, + Iterator, List, Literal, Mapping, @@ -2486,11 +2487,12 @@ def count(self) -> int: get_num_rows = cached_remote_fn(_get_num_rows) - return sum( - ray.get( - [get_num_rows.remote(block) for block in self.get_internal_block_refs()] - ) - ) + # Directly loop over the iterator of `BlockRef`s instead of first + # retrieving a list of `BlockRef`s. + total_rows = 0 + for block_ref in self.iter_internal_block_refs(): + total_rows += ray.get(get_num_rows.remote(block_ref)) + return total_rows @ConsumptionAPI( if_more_than_read=True, @@ -4563,7 +4565,29 @@ def stats(self) -> str: def _get_stats_summary(self) -> DatasetStatsSummary: return self._plan.stats_summary() - @ConsumptionAPI(pattern="Time complexity:") + @ConsumptionAPI(pattern="") + @DeveloperAPI + def iter_internal_block_refs(self) -> Iterator[ObjectRef[Block]]: + """Get an iterator over references to the underlying blocks of this Dataset. + + This function can be used for zero-copy access to the data. It does not + keep the data materialized in-memory. + + Examples: + >>> import ray + >>> ds = ray.data.range(1) + >>> for block_ref in ds.get_internal_block_refs(): + ... block = ray.get(block_ref) + + Returns: + An iterator over references to this Dataset's blocks. + """ + iter_block_refs_md, _, _ = self._plan.execute_to_iterator() + iter_block_refs = (block_ref for block_ref, _ in iter_block_refs_md) + self._synchronize_progress_bar() + return iter_block_refs + + @ConsumptionAPI(pattern="") @DeveloperAPI def get_internal_block_refs(self) -> List[ObjectRef[Block]]: """Get a list of references to the underlying blocks of this dataset. @@ -4577,8 +4601,6 @@ def get_internal_block_refs(self) -> List[ObjectRef[Block]]: >>> ds.get_internal_block_refs() [ObjectRef(...)] - Time complexity: O(1) - Returns: A list of references to this dataset's blocks. """ diff --git a/python/ray/data/tests/test_formats.py b/python/ray/data/tests/test_formats.py index 900c249d5030..bec9e37bf1ba 100644 --- a/python/ray/data/tests/test_formats.py +++ b/python/ray/data/tests/test_formats.py @@ -85,6 +85,23 @@ def test_get_internal_block_refs(ray_start_regular_shared): assert out == list(range(10)), out +def test_iter_internal_block_refs(ray_start_regular_shared): + n = 10 + iter_block_refs = ray.data.range( + n, override_num_blocks=n + ).iter_internal_block_refs() + + out = [] + block_ref_count = 0 + for block_ref in iter_block_refs: + b = ray.get(block_ref) + out.extend(extract_values("id", BlockAccessor.for_block(b).iter_rows(True))) + block_ref_count += 1 + out = sorted(out) + assert block_ref_count == n + assert out == list(range(n)), out + + def test_fsspec_filesystem(ray_start_regular_shared, tmp_path): """Same as `test_parquet_write` but using a custom, fsspec filesystem. From 5ef80417f23b14d541cd1fa0ca50c81d2aed8c0e Mon Sep 17 00:00:00 2001 From: sjl Date: Wed, 3 Jul 2024 04:32:26 +0000 Subject: [PATCH 02/20] return iterator over refbundles Signed-off-by: sjl --- python/ray/data/dataset.py | 53 +++++++++++++++++---------- python/ray/data/tests/test_formats.py | 20 +++++----- 2 files changed, 43 insertions(+), 30 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 99e74e2d51a1..57c0ac38f3ae 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -78,6 +78,7 @@ VALID_BATCH_FORMATS, Block, BlockAccessor, + BlockMetadata, DataBatch, T, U, @@ -2460,12 +2461,13 @@ def show(self, limit: int = 20) -> None: @ConsumptionAPI( if_more_than_read=True, datasource_metadata="row count", - pattern="Time complexity:", + pattern="without reading in the entire data.", ) def count(self) -> int: - """Count the number of records in the dataset. - - Time complexity: O(dataset size / parallelism), O(1) for parquet + """Count the number of records in the dataset. For `Dataset`s + which only read Parquet files (created with :meth:`~ray.data.read_parquet`), + this method reads the file metadata to efficiently count the number of records + without reading in the entire data. Examples: >>> import ray @@ -2485,13 +2487,14 @@ def count(self) -> int: if meta_count is not None: return meta_count - get_num_rows = cached_remote_fn(_get_num_rows) - - # Directly loop over the iterator of `BlockRef`s instead of first - # retrieving a list of `BlockRef`s. + # Directly loop over the iterator of `RefBundle`s instead of + # retrieving a full list of `BlockRef`s. total_rows = 0 - for block_ref in self.iter_internal_block_refs(): - total_rows += ray.get(get_num_rows.remote(block_ref)) + for ref_bundle in self.iter_internal_ref_bundles(): + num_rows = ref_bundle.num_rows() + # Executing the dataset always returns blocks with valid `num_rows`. + assert num_rows is not None + total_rows += num_rows return total_rows @ConsumptionAPI( @@ -4567,25 +4570,35 @@ def _get_stats_summary(self) -> DatasetStatsSummary: @ConsumptionAPI(pattern="") @DeveloperAPI - def iter_internal_block_refs(self) -> Iterator[ObjectRef[Block]]: - """Get an iterator over references to the underlying blocks of this Dataset. - - This function can be used for zero-copy access to the data. It does not - keep the data materialized in-memory. + def iter_internal_ref_bundles(self) -> Iterator[RefBundle]: + """Get an iterator over + :class:`~ray.data._internal.execution.interfaces.RefBundle`s + belonging to this Dataset. Calling this function doesn't keep + the data materialized in-memory. Examples: >>> import ray >>> ds = ray.data.range(1) - >>> for block_ref in ds.get_internal_block_refs(): - ... block = ray.get(block_ref) + >>> for ref_bundle in ds.get_internal_block_refs(): + ... for block_ref, block_md in ref_bundle.blocks: + ... block = ray.get(block_ref) Returns: - An iterator over references to this Dataset's blocks. + An iterator over this Dataset's + :class:`~ray.data._internal.execution.interfaces.RefBundle`s. """ + + def _build_ref_bundle( + blocks: Tuple[ObjectRef[Block], BlockMetadata], + ) -> RefBundle: + # Set `owns_blocks=True` so we can destroy the blocks eagerly + # after getting count from metadata. + return RefBundle((blocks,), owns_blocks=True) + iter_block_refs_md, _, _ = self._plan.execute_to_iterator() - iter_block_refs = (block_ref for block_ref, _ in iter_block_refs_md) + iter_ref_bundles = map(_build_ref_bundle, iter_block_refs_md) self._synchronize_progress_bar() - return iter_block_refs + return iter_ref_bundles @ConsumptionAPI(pattern="") @DeveloperAPI diff --git a/python/ray/data/tests/test_formats.py b/python/ray/data/tests/test_formats.py index bec9e37bf1ba..a813ffeff999 100644 --- a/python/ray/data/tests/test_formats.py +++ b/python/ray/data/tests/test_formats.py @@ -85,20 +85,20 @@ def test_get_internal_block_refs(ray_start_regular_shared): assert out == list(range(10)), out -def test_iter_internal_block_refs(ray_start_regular_shared): +def test_iter_internal_ref_bundles(ray_start_regular_shared): n = 10 - iter_block_refs = ray.data.range( - n, override_num_blocks=n - ).iter_internal_block_refs() + ds = ray.data.range(n, override_num_blocks=n) + iter_ref_bundles = ds.iter_internal_ref_bundles() out = [] - block_ref_count = 0 - for block_ref in iter_block_refs: - b = ray.get(block_ref) - out.extend(extract_values("id", BlockAccessor.for_block(b).iter_rows(True))) - block_ref_count += 1 + ref_bundle_count = 0 + for ref_bundle in iter_ref_bundles: + for block_ref, block_md in ref_bundle.blocks: + b = ray.get(block_ref) + out.extend(extract_values("id", BlockAccessor.for_block(b).iter_rows(True))) + ref_bundle_count += 1 out = sorted(out) - assert block_ref_count == n + assert ref_bundle_count == n assert out == list(range(n)), out From 374898b80c32ef8ce69cf50f556ee51041f9dc8a Mon Sep 17 00:00:00 2001 From: sjl Date: Wed, 3 Jul 2024 17:12:06 +0000 Subject: [PATCH 03/20] fix docs Signed-off-by: sjl --- python/ray/data/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 57c0ac38f3ae..86750027052a 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4579,7 +4579,7 @@ def iter_internal_ref_bundles(self) -> Iterator[RefBundle]: Examples: >>> import ray >>> ds = ray.data.range(1) - >>> for ref_bundle in ds.get_internal_block_refs(): + >>> for ref_bundle in ds.iter_internal_ref_bundles(): ... for block_ref, block_md in ref_bundle.blocks: ... block = ray.get(block_ref) From b0ec8943b07c51d4c406f44f5b148cb3dd3d7b51 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 3 Jul 2024 11:10:34 -0700 Subject: [PATCH 04/20] update consumption api usage Signed-off-by: Scott Lee --- python/ray/data/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 86750027052a..b5daf4062349 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2461,7 +2461,7 @@ def show(self, limit: int = 20) -> None: @ConsumptionAPI( if_more_than_read=True, datasource_metadata="row count", - pattern="without reading in the entire data.", + pattern="Examples:", ) def count(self) -> int: """Count the number of records in the dataset. For `Dataset`s From f0b49f1507d344932dbbc0553808cdf96b03dcdf Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 3 Jul 2024 11:43:43 -0700 Subject: [PATCH 05/20] fix Signed-off-by: Scott Lee --- python/ray/data/dataset.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index b5daf4062349..56afd1e8efdf 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4568,11 +4568,10 @@ def stats(self) -> str: def _get_stats_summary(self) -> DatasetStatsSummary: return self._plan.stats_summary() - @ConsumptionAPI(pattern="") + @ConsumptionAPI(pattern="Examples:") @DeveloperAPI def iter_internal_ref_bundles(self) -> Iterator[RefBundle]: - """Get an iterator over - :class:`~ray.data._internal.execution.interfaces.RefBundle`s + """Get an iterator over ``RefBundle``s belonging to this Dataset. Calling this function doesn't keep the data materialized in-memory. @@ -4584,8 +4583,7 @@ def iter_internal_ref_bundles(self) -> Iterator[RefBundle]: ... block = ray.get(block_ref) Returns: - An iterator over this Dataset's - :class:`~ray.data._internal.execution.interfaces.RefBundle`s. + An iterator over this Dataset's ``RefBundle``s. """ def _build_ref_bundle( From aa368d4257fcc7663a6edbb4db372d2ef63da635 Mon Sep 17 00:00:00 2001 From: sjl Date: Wed, 3 Jul 2024 20:28:16 +0000 Subject: [PATCH 06/20] clean up Signed-off-by: sjl --- doc/source/data/api/dataset.rst | 1 + python/ray/data/dataset.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/doc/source/data/api/dataset.rst b/doc/source/data/api/dataset.rst index c884a3aaf0a1..44d52810ee58 100644 --- a/doc/source/data/api/dataset.rst +++ b/doc/source/data/api/dataset.rst @@ -131,6 +131,7 @@ Inspecting Metadata Dataset.input_files Dataset.stats Dataset.get_internal_block_refs + Dataset.iter_internal_ref_bundles Execution --------- diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 56afd1e8efdf..950c47e31384 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4571,7 +4571,7 @@ def _get_stats_summary(self) -> DatasetStatsSummary: @ConsumptionAPI(pattern="Examples:") @DeveloperAPI def iter_internal_ref_bundles(self) -> Iterator[RefBundle]: - """Get an iterator over ``RefBundle``s + """Get an iterator over ``RefBundles`` belonging to this Dataset. Calling this function doesn't keep the data materialized in-memory. @@ -4583,7 +4583,7 @@ def iter_internal_ref_bundles(self) -> Iterator[RefBundle]: ... block = ray.get(block_ref) Returns: - An iterator over this Dataset's ``RefBundle``s. + An iterator over this Dataset's ``RefBundles``. """ def _build_ref_bundle( From ab8d6ed4cf6a72d7ed753eceeb9973e0da5055b7 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 3 Jul 2024 14:55:40 -0700 Subject: [PATCH 07/20] comments Signed-off-by: Scott Lee --- python/ray/data/dataset.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 950c47e31384..0874d9c140dc 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2464,10 +2464,11 @@ def show(self, limit: int = 20) -> None: pattern="Examples:", ) def count(self) -> int: - """Count the number of records in the dataset. For `Dataset`s - which only read Parquet files (created with :meth:`~ray.data.read_parquet`), - this method reads the file metadata to efficiently count the number of records - without reading in the entire data. + """Count the number of records in the dataset. + + For Datasets which only read Parquet files (created with + :meth:`~ray.data.read_parquet`), this method reads the file metadata to + efficiently count the number of records without reading in the entire data. Examples: >>> import ray @@ -4598,7 +4599,7 @@ def _build_ref_bundle( self._synchronize_progress_bar() return iter_ref_bundles - @ConsumptionAPI(pattern="") + @ConsumptionAPI(pattern="Examples:") @DeveloperAPI def get_internal_block_refs(self) -> List[ObjectRef[Block]]: """Get a list of references to the underlying blocks of this dataset. From 128614db0b6f6bf93f6f69c255b546cd885c48c0 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 3 Jul 2024 15:04:27 -0700 Subject: [PATCH 08/20] comments Signed-off-by: Scott Lee --- python/ray/data/_internal/plan.py | 2 ++ python/ray/data/dataset.py | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 392cfbd592bc..a46d938ee257 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -414,6 +414,8 @@ def execute_to_iterator( metrics_tag = create_dataset_tag(self._dataset_name, self._dataset_uuid) executor = StreamingExecutor(copy.deepcopy(ctx.execution_options), metrics_tag) + # TODO(scottjlee): replace with `execute_to_legacy_bundle_iterator` and + # update execute_to_iterator usages to handle RefBundles instead of Blocks block_iter = execute_to_legacy_block_iterator( executor, self, diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 0874d9c140dc..195a04845cbf 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4590,8 +4590,6 @@ def iter_internal_ref_bundles(self) -> Iterator[RefBundle]: def _build_ref_bundle( blocks: Tuple[ObjectRef[Block], BlockMetadata], ) -> RefBundle: - # Set `owns_blocks=True` so we can destroy the blocks eagerly - # after getting count from metadata. return RefBundle((blocks,), owns_blocks=True) iter_block_refs_md, _, _ = self._plan.execute_to_iterator() @@ -4616,6 +4614,7 @@ def get_internal_block_refs(self) -> List[ObjectRef[Block]]: Returns: A list of references to this dataset's blocks. """ + # TODO(scottjlee): replace get_internal_block_refs() usages with iter_internal_ref_bundles() block_refs = self._plan.execute().block_refs self._synchronize_progress_bar() return block_refs From 65c3f4ef6164ddebd4d5e39b2aed60e07a81e688 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 3 Jul 2024 15:05:10 -0700 Subject: [PATCH 09/20] lint Signed-off-by: Scott Lee --- python/ray/data/dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 195a04845cbf..f34113a47de9 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4614,7 +4614,8 @@ def get_internal_block_refs(self) -> List[ObjectRef[Block]]: Returns: A list of references to this dataset's blocks. """ - # TODO(scottjlee): replace get_internal_block_refs() usages with iter_internal_ref_bundles() + # TODO(scottjlee): replace get_internal_block_refs() usages with + # iter_internal_ref_bundles() block_refs = self._plan.execute().block_refs self._synchronize_progress_bar() return block_refs From 958d306a5167b1147dd1a2f8ea5c01d760491e85 Mon Sep 17 00:00:00 2001 From: sjl Date: Thu, 4 Jul 2024 00:18:11 +0000 Subject: [PATCH 10/20] fix tests Signed-off-by: sjl --- python/ray/data/dataset.py | 21 +++++++++++---------- python/ray/data/tests/test_zip.py | 12 ++++++++++-- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 4886509379c3..83bb31e5a563 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2464,9 +2464,9 @@ def show(self, limit: int = 20) -> None: pattern="Examples:", ) def count(self) -> int: - """Count the number of records in the dataset. For `Dataset`s + """Count the number of rows in the dataset. For `Dataset`s which only read Parquet files (created with :meth:`~ray.data.read_parquet`), - this method reads the file metadata to efficiently count the number of records + this method reads the file metadata to efficiently count the number of rows without reading in the entire data. Examples: @@ -4333,14 +4333,15 @@ def to_pandas(self, limit: int = None) -> "pandas.DataFrame": ValueError: if the number of rows in the :class:`~ray.data.Dataset` exceeds ``limit``. """ - count = self.count() - if limit is not None and count > limit: - raise ValueError( - f"the dataset has more than the given limit of {limit} " - f"rows: {count}. If you are sure that a DataFrame with " - f"{count} rows will fit in local memory, set ds.to_pandas(limit=None) " - "to disable limits." - ) + if limit is not None: + count = self.count() + if count > limit: + raise ValueError( + f"the dataset has more than the given limit of {limit} " + f"rows: {count}. If you are sure that a DataFrame with " + f"{count} rows will fit in local memory, set " + "ds.to_pandas(limit=None) to disable limits." + ) blocks = self.get_internal_block_refs() output = DelegatingBlockBuilder() for block in blocks: diff --git a/python/ray/data/tests/test_zip.py b/python/ray/data/tests/test_zip.py index 2df33ef8bb0e..a7130722427b 100644 --- a/python/ray/data/tests/test_zip.py +++ b/python/ray/data/tests/test_zip.py @@ -99,17 +99,25 @@ def test_zip_arrow(ray_start_regular_shared): ds2 = ray.data.range(5).map(lambda r: {"a": r["id"] + 1, "b": r["id"] + 2}) ds = ds1.zip(ds2) assert ds.count() == 5 - assert "{id: int64, a: int64, b: int64}" in str(ds) + result = list(ds.take()) assert result[0] == {"id": 0, "a": 1, "b": 2} + # Execute the dataset to get full schema. + ds = ds.materialize() + assert "{id: int64, a: int64, b: int64}" in str(ds) + # Test duplicate column names. ds = ds1.zip(ds1).zip(ds1) assert ds.count() == 5 - assert "{id: int64, id_1: int64, id_2: int64}" in str(ds) + result = list(ds.take()) assert result[0] == {"id": 0, "id_1": 0, "id_2": 0} + # Execute the dataset to get full schema. + ds = ds.materialize() + assert "{id: int64, id_1: int64, id_2: int64}" in str(ds) + def test_zip_multiple_block_types(ray_start_regular_shared): df = pd.DataFrame({"spam": [0]}) From 87151ee207acb61c4bde86722bf7eddacacf1200 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Fri, 5 Jul 2024 10:54:53 -0700 Subject: [PATCH 11/20] update tests Signed-off-by: Scott Lee --- python/ray/data/tests/test_zip.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/python/ray/data/tests/test_zip.py b/python/ray/data/tests/test_zip.py index a7130722427b..6c57afbb3a3b 100644 --- a/python/ray/data/tests/test_zip.py +++ b/python/ray/data/tests/test_zip.py @@ -82,17 +82,25 @@ def test_zip_pandas(ray_start_regular_shared): ds2 = ray.data.from_pandas(pd.DataFrame({"col3": ["a", "b"], "col4": ["d", "e"]})) ds = ds1.zip(ds2) assert ds.count() == 2 - assert "{col1: int64, col2: int64, col3: object, col4: object}" in str(ds) + result = list(ds.take()) assert result[0] == {"col1": 1, "col2": 4, "col3": "a", "col4": "d"} + # Execute the dataset to get full schema. + ds = ds.materialize() + assert "{col1: int64, col2: int64, col3: object, col4: object}" in str(ds) + ds3 = ray.data.from_pandas(pd.DataFrame({"col2": ["a", "b"], "col4": ["d", "e"]})) ds = ds1.zip(ds3) assert ds.count() == 2 - assert "{col1: int64, col2: int64, col2_1: object, col4: object}" in str(ds) + result = list(ds.take()) assert result[0] == {"col1": 1, "col2": 4, "col2_1": "a", "col4": "d"} + # Execute the dataset to get full schema. + ds = ds.materialize() + assert "{col1: int64, col2: int64, col2_1: object, col4: object}" in str(ds) + def test_zip_arrow(ray_start_regular_shared): ds1 = ray.data.range(5).map(lambda r: {"id": r["id"]}) From 6d15acd158af80a27ff9f6d854553299e60f6b05 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Fri, 5 Jul 2024 17:03:00 -0700 Subject: [PATCH 12/20] replace get_internal_block_refs usages Signed-off-by: Scott Lee --- .../execution/interfaces/ref_bundle.py | 13 ++- python/ray/data/dataset.py | 86 ++++++++++++------- python/ray/data/random_access_dataset.py | 6 +- python/ray/data/tests/test_all_to_all.py | 6 +- python/ray/data/tests/test_consumption.py | 15 ++-- python/ray/data/tests/test_map.py | 7 +- python/ray/data/tests/test_pandas.py | 17 +++- python/ray/data/tests/test_parquet.py | 12 ++- python/ray/data/tests/test_size_estimation.py | 27 ++++-- python/ray/data/tests/test_split.py | 12 ++- python/ray/data/tests/test_text.py | 13 ++- .../ray/data/tests/test_transform_pyarrow.py | 3 +- python/ray/data/tests/test_zip.py | 3 +- 13 files changed, 154 insertions(+), 66 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/ref_bundle.py b/python/ray/data/_internal/execution/interfaces/ref_bundle.py index c8996a1c422b..bfbbea3586db 100644 --- a/python/ray/data/_internal/execution/interfaces/ref_bundle.py +++ b/python/ray/data/_internal/execution/interfaces/ref_bundle.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import List, Optional, Tuple +from typing import Iterator, List, Optional, Tuple import ray from .common import NodeIdStr @@ -59,7 +59,7 @@ def __setattr__(self, key, value): object.__setattr__(self, key, value) @property - def block_refs(self) -> List[BlockMetadata]: + def block_refs(self) -> List[ObjectRef[Block]]: """List of block references in this bundle.""" return [block_ref for block_ref, _ in self.blocks] @@ -125,3 +125,12 @@ def __hash__(self) -> int: def __len__(self) -> int: return len(self.blocks) + + +def _ref_bundles_iterator_to_block_refs_list( + ref_bundles: Iterator[RefBundle], +) -> List[ObjectRef[Block]]: + """Convert an iterator of RefBundles to a list of object references to Blocks.""" + return [ + block_ref for ref_bundle in ref_bundles for block_ref in ref_bundle.block_refs + ] diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index c5380bb2fb16..aa30a59b35f0 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -44,6 +44,9 @@ from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder from ray.data._internal.equalize import _equalize from ray.data._internal.execution.interfaces import RefBundle +from ray.data._internal.execution.interfaces.ref_bundle import ( + _ref_bundles_iterator_to_block_refs_list, +) from ray.data._internal.iterator.iterator_impl import DataIteratorImpl from ray.data._internal.iterator.stream_split_iterator import StreamSplitDataIterator from ray.data._internal.logical.operators.all_to_all_operator import ( @@ -91,7 +94,7 @@ from ray.data.iterator import DataIterator from ray.data.random_access_dataset import RandomAccessDataset from ray.types import ObjectRef -from ray.util.annotations import DeveloperAPI, PublicAPI +from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy from ray.widgets import Template from ray.widgets.util import repr_with_fallback @@ -4166,14 +4169,14 @@ def to_dask( dask.config.set(scheduler=ray_dask_get) @dask.delayed - def block_to_df(block: Block): - if isinstance(block, (ray.ObjectRef, ClientObjectRef)): + def block_to_df(block_ref: Block) -> pd.DataFrame: + if isinstance(block_ref, (ray.ObjectRef, ClientObjectRef)): raise ValueError( "Dataset.to_dask() must be used with Dask-on-Ray, please " "set the Dask scheduler to ray_dask_get (located in " "ray.util.dask)." ) - return _block_to_df(block) + return _block_to_df(block_ref) if meta is None: from ray.data.extensions import TensorDtype @@ -4212,8 +4215,13 @@ def block_to_df(block: Block): else: meta = schema.empty_table().to_pandas() + dfs = [] + for ref_bundle in self.iter_internal_ref_bundles(): + for block_ref in ref_bundle.block_refs: + dfs.append(block_to_df(block_ref)) + ddf = dd.from_delayed( - [block_to_df(block) for block in self.get_internal_block_refs()], + dfs, meta=meta, verify_meta=verify_meta, ) @@ -4266,7 +4274,7 @@ def to_modin(self) -> "modin.pandas.dataframe.DataFrame": This is only supported for datasets convertible to Arrow records. This function induces a copy of the data. For zero-copy access to the underlying data, consider using :meth:`.to_arrow_refs` or - :meth:`.get_internal_block_refs`. + :meth:`.iter_internal_ref_bundles`. Time complexity: O(dataset size / parallelism) @@ -4299,9 +4307,10 @@ def to_spark(self, spark: "pyspark.sql.SparkSession") -> "pyspark.sql.DataFrame" schema = self.schema() if isinstance(schema, Schema): schema = schema.base_schema - return raydp.spark.ray_dataset_to_spark_dataframe( - spark, schema, self.get_internal_block_refs() - ) + + ref_bundles = self.iter_internal_ref_bundles() + block_refs = _ref_bundles_iterator_to_block_refs_list(ref_bundles) + return raydp.spark.ray_dataset_to_spark_dataframe(spark, schema, block_refs) @ConsumptionAPI(pattern="Time complexity:") def to_pandas(self, limit: int = None) -> "pandas.DataFrame": @@ -4343,10 +4352,12 @@ def to_pandas(self, limit: int = None) -> "pandas.DataFrame": f"{count} rows will fit in local memory, set " "ds.to_pandas(limit=None) to disable limits." ) - blocks = self.get_internal_block_refs() + bundles = self.iter_internal_ref_bundles() output = DelegatingBlockBuilder() - for block in blocks: - output.add_block(ray.get(block)) + + for bundle in bundles: + for block_ref in bundle.block_refs: + output.add_block(ray.get(block_ref)) block = output.build() return _block_to_df(block) @@ -4360,7 +4371,7 @@ def to_pandas_refs(self) -> List[ObjectRef["pandas.DataFrame"]]: This function induces a copy of the data. For zero-copy access to the underlying data, consider using :meth:`Dataset.to_arrow_refs` or - :meth:`Dataset.get_internal_block_refs`. + :meth:`Dataset.iter_internal_ref_bundles`. Examples: >>> import ray @@ -4376,7 +4387,11 @@ def to_pandas_refs(self) -> List[ObjectRef["pandas.DataFrame"]]: """ block_to_df = cached_remote_fn(_block_to_df) - return [block_to_df.remote(block) for block in self.get_internal_block_refs()] + pandas_refs = [] + for bundle in self.iter_internal_ref_bundles(): + for block_ref in bundle.block_refs: + pandas_refs.append(block_to_df.remote(block_ref)) + return pandas_refs @DeveloperAPI def to_numpy_refs( @@ -4388,7 +4403,7 @@ def to_numpy_refs( This is only supported for datasets convertible to NumPy ndarrays. This function induces a copy of the data. For zero-copy access to the underlying data, consider using :meth:`Dataset.to_arrow_refs` or - :meth:`Dataset.get_internal_block_refs`. + :meth:`Dataset.iter_internal_ref_bundles`. Examples: >>> import ray @@ -4408,10 +4423,11 @@ def to_numpy_refs( A list of remote NumPy ndarrays created from this dataset. """ block_to_ndarray = cached_remote_fn(_block_to_ndarray) - return [ - block_to_ndarray.remote(block, column=column) - for block in self.get_internal_block_refs() - ] + numpy_refs = [] + for bundle in self.iter_internal_ref_bundles(): + for block_ref in bundle.block_refs: + numpy_refs.append(block_to_ndarray.remote(block_ref, column=column)) + return numpy_refs @ConsumptionAPI(pattern="Time complexity:") @DeveloperAPI @@ -4439,18 +4455,21 @@ def to_arrow_refs(self) -> List[ObjectRef["pyarrow.Table"]]: """ import pyarrow as pa - blocks: List[ObjectRef["pyarrow.Table"]] = self.get_internal_block_refs() + ref_bundles: Iterator[RefBundle] = self.iter_internal_ref_bundles() + block_refs: List[ + ObjectRef["pyarrow.Table"] + ] = _ref_bundles_iterator_to_block_refs_list(ref_bundles) # Schema is safe to call since we have already triggered execution with - # get_internal_block_refs. + # iter_internal_ref_bundles. schema = self.schema(fetch_if_missing=True) if isinstance(schema, Schema): schema = schema.base_schema if isinstance(schema, pa.Schema): # Zero-copy path. - return blocks + return block_refs block_to_arrow = cached_remote_fn(_block_to_arrow) - return [block_to_arrow.remote(block) for block in blocks] + return [block_to_arrow.remote(block) for block in block_refs] @ConsumptionAPI(pattern="Args:") def to_random_access_dataset( @@ -4598,8 +4617,7 @@ def _build_ref_bundle( self._synchronize_progress_bar() return iter_ref_bundles - @ConsumptionAPI(pattern="Examples:") - @DeveloperAPI + @Deprecated def get_internal_block_refs(self) -> List[ObjectRef[Block]]: """Get a list of references to the underlying blocks of this dataset. @@ -4615,8 +4633,6 @@ def get_internal_block_refs(self) -> List[ObjectRef[Block]]: Returns: A list of references to this dataset's blocks. """ - # TODO(scottjlee): replace get_internal_block_refs() usages with - # iter_internal_ref_bundles() block_refs = self._plan.execute().block_refs self._synchronize_progress_bar() return block_refs @@ -4936,13 +4952,19 @@ def __iter__(self): def _block_num_rows(self) -> List[int]: get_num_rows = cached_remote_fn(_get_num_rows) - return ray.get([get_num_rows.remote(b) for b in self.get_internal_block_refs()]) + num_rows = [] + for ref_bundle in self.iter_internal_ref_bundles(): + for block_ref in ref_bundle.block_refs: + num_rows.append(get_num_rows.remote(block_ref)) + return ray.get(num_rows) def _block_size_bytes(self) -> List[int]: get_size_bytes = cached_remote_fn(_get_size_bytes) - return ray.get( - [get_size_bytes.remote(b) for b in self.get_internal_block_refs()] - ) + size_bytes = [] + for ref_bundle in self.iter_internal_ref_bundles(): + for block_ref in ref_bundle.block_refs: + size_bytes.append(get_size_bytes.remote(block_ref)) + return ray.get(size_bytes) def _meta_count(self) -> Optional[int]: return self._plan.meta_count() @@ -5101,7 +5123,7 @@ def _get_size_bytes(block: Block) -> int: return block.size_bytes() -def _block_to_df(block: Block): +def _block_to_df(block: Block) -> "pandas.DataFrame": block = BlockAccessor.for_block(block) return block.to_pandas() diff --git a/python/ray/data/random_access_dataset.py b/python/ray/data/random_access_dataset.py index 06d3438a2e18..a24c6796f7ca 100644 --- a/python/ray/data/random_access_dataset.py +++ b/python/ray/data/random_access_dataset.py @@ -8,6 +8,9 @@ import numpy as np import ray +from ray.data._internal.execution.interfaces.ref_bundle import ( + _ref_bundles_iterator_to_block_refs_list, +) from ray.data._internal.remote_fn import cached_remote_fn from ray.data.block import BlockAccessor from ray.data.context import DataContext @@ -51,7 +54,8 @@ def __init__( logger.info("[setup] Indexing dataset by sort key.") sorted_ds = ds.sort(key) get_bounds = cached_remote_fn(_get_bounds) - blocks = sorted_ds.get_internal_block_refs() + bundles = sorted_ds.iter_internal_ref_bundles() + blocks = _ref_bundles_iterator_to_block_refs_list(bundles) logger.info("[setup] Computing block range bounds.") bounds = ray.get([get_bounds.remote(b, key) for b in blocks]) diff --git a/python/ray/data/tests/test_all_to_all.py b/python/ray/data/tests/test_all_to_all.py index 6b0f69a563ca..2b0a72175cf9 100644 --- a/python/ray/data/tests/test_all_to_all.py +++ b/python/ray/data/tests/test_all_to_all.py @@ -11,6 +11,9 @@ import ray from ray.data._internal.aggregate import Count, Max, Mean, Min, Quantile, Std, Sum +from ray.data._internal.execution.interfaces.ref_bundle import ( + _ref_bundles_iterator_to_block_refs_list, +) from ray.data.aggregate import AggregateFn from ray.data.context import DataContext from ray.data.tests.conftest import * # noqa @@ -1251,7 +1254,8 @@ def get_node_id(): node2_id = ray.get(get_node_id.options(resources={"bar:2": 1}).remote()) ds = ray.data.range(100, override_num_blocks=2).random_shuffle() - blocks = ds.get_internal_block_refs() + bundles = ds.iter_internal_ref_bundles() + blocks = _ref_bundles_iterator_to_block_refs_list(bundles) ray.wait(blocks, num_returns=len(blocks), fetch_local=False) location_data = ray.experimental.get_object_locations(blocks) locations = [] diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py index cc806d905263..f2f557ab6abe 100644 --- a/python/ray/data/tests/test_consumption.py +++ b/python/ray/data/tests/test_consumption.py @@ -16,6 +16,9 @@ from ray.data._internal.datasource.csv_datasink import CSVDatasink from ray.data._internal.datasource.csv_datasource import CSVDatasource from ray.data._internal.datasource.range_datasource import RangeDatasource +from ray.data._internal.execution.interfaces.ref_bundle import ( + _ref_bundles_iterator_to_block_refs_list, +) from ray.data._internal.util import _check_pyarrow_version from ray.data.block import BlockAccessor, BlockMetadata from ray.data.context import DataContext @@ -679,7 +682,8 @@ def test_convert_types(ray_start_regular_shared): def test_from_blocks(input_blocks, ray_start_regular_shared): ds = ray.data.from_blocks(input_blocks) - output_blocks = [ray.get(block_ref) for block_ref in ds.get_internal_block_refs()] + bundles = ds.iter_internal_ref_bundles() + output_blocks = ray.get(_ref_bundles_iterator_to_block_refs_list(bundles)) assert len(input_blocks) == len(output_blocks) assert all( input_block.equals(output_block) @@ -1628,11 +1632,12 @@ def test_read_write_local_node(ray_start_cluster): ctx.read_write_local_node = True def check_dataset_is_local(ds): - blocks = ds.get_internal_block_refs() - ray.wait(blocks, num_returns=len(blocks), fetch_local=False) - location_data = ray.experimental.get_object_locations(blocks) + bundles = ds.iter_internal_ref_bundles() + block_refs = _ref_bundles_iterator_to_block_refs_list(bundles) + ray.wait(block_refs, num_returns=len(block_refs), fetch_local=False) + location_data = ray.experimental.get_object_locations(block_refs) locations = [] - for block in blocks: + for block in block_refs: locations.extend(location_data[block]["node_ids"]) assert set(locations) == {ray.get_runtime_context().get_node_id()} diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index 337bacc6f77d..18b36bfb8246 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -13,6 +13,9 @@ import pytest import ray +from ray.data._internal.execution.interfaces.ref_bundle import ( + _ref_bundles_iterator_to_block_refs_list, +) from ray.data.context import DataContext from ray.data.exceptions import UserCodeException from ray.data.tests.conftest import * # noqa @@ -944,7 +947,9 @@ def empty_pandas(batch): .map_batches(lambda x: x, batch_size=None) ) - block_refs = ds.get_internal_block_refs() + bundles = ds.iter_internal_ref_bundles() + block_refs = _ref_bundles_iterator_to_block_refs_list(bundles) + assert len(block_refs) == 1 assert type(ray.get(block_refs[0])) == pd.DataFrame diff --git a/python/ray/data/tests/test_pandas.py b/python/ray/data/tests/test_pandas.py index 9bb157789ef4..cc965c1e6c6f 100644 --- a/python/ray/data/tests/test_pandas.py +++ b/python/ray/data/tests/test_pandas.py @@ -1,13 +1,22 @@ +from typing import Iterator + import numpy as np import pandas as pd import pyarrow as pa import pytest import ray +from ray.data._internal.execution.interfaces.ref_bundle import RefBundle +from ray.data.block import Block from ray.data.extensions import ArrowTensorArray, ArrowTensorType, TensorDtype from ray.data.tests.conftest import * # noqa from ray.data.tests.mock_http_server import * # noqa from ray.tests.conftest import * # noqa +from ray.types import ObjectRef + + +def _get_first_block(bundles: Iterator[RefBundle]) -> ObjectRef[Block]: + return next(bundles).block_refs[0] @pytest.mark.parametrize("enable_pandas_block", [False, True]) @@ -19,7 +28,7 @@ def test_from_pandas(ray_start_regular_shared, enable_pandas_block): df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) ds = ray.data.from_pandas([df1, df2]) - block = ray.get(ds.get_internal_block_refs()[0]) + block = ray.get(_get_first_block(ds.iter_internal_ref_bundles())) assert ( isinstance(block, pd.DataFrame) if enable_pandas_block @@ -33,7 +42,7 @@ def test_from_pandas(ray_start_regular_shared, enable_pandas_block): # test from single pandas dataframe ds = ray.data.from_pandas(df1) - block = ray.get(ds.get_internal_block_refs()[0]) + block = ray.get(_get_first_block(ds.iter_internal_ref_bundles())) assert ( isinstance(block, pd.DataFrame) if enable_pandas_block @@ -82,7 +91,7 @@ def test_from_pandas_refs(ray_start_regular_shared, enable_pandas_block): df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) ds = ray.data.from_pandas_refs([ray.put(df1), ray.put(df2)]) - block = ray.get(ds.get_internal_block_refs()[0]) + block = ray.get(_get_first_block(ds.iter_internal_ref_bundles())) assert ( isinstance(block, pd.DataFrame) if enable_pandas_block @@ -96,7 +105,7 @@ def test_from_pandas_refs(ray_start_regular_shared, enable_pandas_block): # test from single pandas dataframe ref ds = ray.data.from_pandas_refs(ray.put(df1)) - block = ray.get(ds.get_internal_block_refs()[0]) + block = ray.get(_get_first_block(ds.iter_internal_ref_bundles())) assert ( isinstance(block, pd.DataFrame) if enable_pandas_block diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py index 9e92e2536b12..e3eeb8ee68e6 100644 --- a/python/ray/data/tests/test_parquet.py +++ b/python/ray/data/tests/test_parquet.py @@ -18,6 +18,9 @@ _deserialize_fragments_with_retry, _SerializedFragment, ) +from ray.data._internal.execution.interfaces.ref_bundle import ( + _ref_bundles_iterator_to_block_refs_list, +) from ray.data.block import BlockAccessor from ray.data.context import DataContext from ray.data.datasource import DefaultFileMetadataProvider, ParquetMetadataProvider @@ -1139,11 +1142,12 @@ def get_node_id(): ds = ray.data.read_parquet(data_path) # Force reads. - blocks = ds.get_internal_block_refs() - ray.wait(blocks, num_returns=len(blocks), fetch_local=False) - location_data = ray.experimental.get_object_locations(blocks) + bundles = ds.iter_internal_ref_bundles() + block_refs = _ref_bundles_iterator_to_block_refs_list(bundles) + ray.wait(block_refs, num_returns=len(block_refs), fetch_local=False) + location_data = ray.experimental.get_object_locations(block_refs) locations = [] - for block in blocks: + for block in block_refs: locations.extend(location_data[block]["node_ids"]) assert set(locations) == {node1_id, node2_id} diff --git a/python/ray/data/tests/test_size_estimation.py b/python/ray/data/tests/test_size_estimation.py index 8e24d7ea8cda..667473751f57 100644 --- a/python/ray/data/tests/test_size_estimation.py +++ b/python/ray/data/tests/test_size_estimation.py @@ -1,10 +1,12 @@ import os import uuid +from typing import Iterable import pytest import ray from ray.data._internal.arrow_block import ArrowBlockBuilder +from ray.data._internal.execution.interfaces.ref_bundle import RefBundle from ray.tests.conftest import * # noqa SMALL_VALUE = "a" * 100 @@ -187,17 +189,24 @@ def __call__(self, x): ctx.target_max_block_size = 20_000_000 ctx.target_max_block_size = 20_000_000 ds2 = ray.data.range(1000, override_num_blocks=1).map(arrow_fn, **kwargs) - nblocks = len(ds2.map(identity_fn, **kwargs).get_internal_block_refs()) + bundles: Iterable[RefBundle] = ds2.map( + identity_fn, **kwargs + ).iter_internal_ref_bundles() + nblocks = sum([len(b.block_refs) for b in bundles]) assert nblocks == 1, nblocks ctx.target_max_block_size = 2_000_000 - nblocks = len(ds2.map(identity_fn, **kwargs).get_internal_block_refs()) + bundles: Iterable[RefBundle] = ds2.map( + identity_fn, **kwargs + ).iter_internal_ref_bundles() + nblocks = sum([len(b.block_refs) for b in bundles]) assert 4 < nblocks < 7 or use_actors, nblocks # Disabled. # Setting a huge block size effectively disables block splitting. ctx.target_max_block_size = 2**64 ds3 = ray.data.range(1000, override_num_blocks=1).map(arrow_fn, **kwargs) - nblocks = len(ds3.map(identity_fn, **kwargs).get_internal_block_refs()) + bundles = ds3.map(identity_fn, **kwargs).iter_internal_ref_bundles() + nblocks = sum([len(b.block_refs) for b in bundles]) assert nblocks == 1, nblocks @@ -207,10 +216,12 @@ def test_split_flat_map(ray_start_regular_shared): # Arrow block ctx.target_max_block_size = 20_000_000 ds2 = ray.data.range(1000, override_num_blocks=1).map(lambda _: ARROW_LARGE_VALUE) - nblocks = len(ds2.flat_map(lambda x: [x]).get_internal_block_refs()) + bundles = ds2.flat_map(lambda x: [x]).iter_internal_ref_bundles() + nblocks = sum([len(b.block_refs) for b in bundles]) assert nblocks == 1, nblocks ctx.target_max_block_size = 2_000_000 - nblocks = len(ds2.flat_map(lambda x: [x]).get_internal_block_refs()) + bundles = ds2.flat_map(lambda x: [x]).iter_internal_ref_bundles() + nblocks = sum([len(b.block_refs) for b in bundles]) assert 4 < nblocks < 7, nblocks @@ -220,10 +231,12 @@ def test_split_map_batches(ray_start_regular_shared): # Arrow block ctx.target_max_block_size = 20_000_000 ds2 = ray.data.range(1000, override_num_blocks=1).map(lambda _: ARROW_LARGE_VALUE) - nblocks = len(ds2.map_batches(lambda x: x, batch_size=1).get_internal_block_refs()) + bundles = ds2.map_batches(lambda x: x, batch_size=1).iter_internal_ref_bundles() + nblocks = sum([len(b.block_refs) for b in bundles]) assert nblocks == 1, nblocks ctx.target_max_block_size = 2_000_000 - nblocks = len(ds2.map_batches(lambda x: x, batch_size=16).get_internal_block_refs()) + bundles = ds2.map_batches(lambda x: x, batch_size=16).iter_internal_ref_bundles() + nblocks = sum([len(b.block_refs) for b in bundles]) assert 4 < nblocks < 7, nblocks diff --git a/python/ray/data/tests/test_split.py b/python/ray/data/tests/test_split.py index 2cd81cd2f6ae..1af6596fdb13 100644 --- a/python/ray/data/tests/test_split.py +++ b/python/ray/data/tests/test_split.py @@ -13,6 +13,9 @@ from ray.data._internal.block_list import BlockList from ray.data._internal.equalize import _equalize from ray.data._internal.execution.interfaces import RefBundle +from ray.data._internal.execution.interfaces.ref_bundle import ( + _ref_bundles_iterator_to_block_refs_list, +) from ray.data._internal.logical.interfaces import LogicalPlan from ray.data._internal.logical.operators.input_data_operator import InputData from ray.data._internal.plan import ExecutionPlan @@ -388,8 +391,9 @@ def assert_split_assignment(block_node_ids, actor_node_ids, expected_split_resul datasets[1] contains block 2. """ num_blocks = len(block_node_ids) - ds = ray.data.range(num_blocks, override_num_blocks=num_blocks) - blocks = ds.get_internal_block_refs() + ds = ray.data.range(num_blocks, override_num_blocks=num_blocks).materialize() + bundles = ds.iter_internal_ref_bundles() + blocks = _ref_bundles_iterator_to_block_refs_list(bundles) assert len(block_node_ids) == len(blocks) actors = [Actor.remote() for i in range(len(actor_node_ids))] with patch("ray.experimental.get_object_locations") as location_mock: @@ -412,7 +416,9 @@ def assert_split_assignment(block_node_ids, actor_node_ids, expected_split_resul assert len(datasets) == len(actors) for i in range(len(actors)): assert {blocks[j] for j in expected_split_result[i]} == set( - datasets[i].get_internal_block_refs() + _ref_bundles_iterator_to_block_refs_list( + datasets[i].iter_internal_ref_bundles() + ) ) assert_split_assignment( diff --git a/python/ray/data/tests/test_text.py b/python/ray/data/tests/test_text.py index 5a3239750eb9..fba2cc78d4e4 100644 --- a/python/ray/data/tests/test_text.py +++ b/python/ray/data/tests/test_text.py @@ -4,6 +4,9 @@ import pytest import ray +from ray.data._internal.execution.interfaces.ref_bundle import ( + _ref_bundles_iterator_to_block_refs_list, +) from ray.data.datasource import ( BaseFileMetadataProvider, FastFileMetadataProvider, @@ -194,11 +197,13 @@ def get_node_id(): path, override_num_blocks=2, ray_remote_args={"resources": {"bar": 1}} ) - blocks = ds.get_internal_block_refs() - ray.wait(blocks, num_returns=len(blocks), fetch_local=False) - location_data = ray.experimental.get_object_locations(blocks) + block_refs = _ref_bundles_iterator_to_block_refs_list( + ds.iter_internal_ref_bundles() + ) + ray.wait(block_refs, num_returns=len(block_refs), fetch_local=False) + location_data = ray.experimental.get_object_locations(block_refs) locations = [] - for block in blocks: + for block in block_refs: locations.extend(location_data[block]["node_ids"]) assert set(locations) == {bar_node_id}, locations assert sorted(_to_lines(ds.take())) == ["goodbye", "hello", "world"] diff --git a/python/ray/data/tests/test_transform_pyarrow.py b/python/ray/data/tests/test_transform_pyarrow.py index 366160c0ea4a..cfe53c9b98f5 100644 --- a/python/ray/data/tests/test_transform_pyarrow.py +++ b/python/ray/data/tests/test_transform_pyarrow.py @@ -444,7 +444,8 @@ def test_fallback_to_pandas_on_incompatible_data( # Ray Data will fall back to using Pandas. ds = _create_datasset(op, data) ds = ds.materialize() - block = ray.get(ds.get_internal_block_refs()[0]) + bundles = ds.iter_internal_ref_bundles() + block = ray.get(next(bundles).block_refs[0]) assert isinstance(block, pd.DataFrame) diff --git a/python/ray/data/tests/test_zip.py b/python/ray/data/tests/test_zip.py index 6c57afbb3a3b..db74a8fd5ba0 100644 --- a/python/ray/data/tests/test_zip.py +++ b/python/ray/data/tests/test_zip.py @@ -69,7 +69,8 @@ def test_zip_different_num_blocks_split_smallest( override_num_blocks=num_blocks2, ) ds = ds1.zip(ds2).materialize() - num_blocks = len(ds.get_internal_block_refs()) + bundles = ds.iter_internal_ref_bundles() + num_blocks = sum([len(b.block_refs) for b in bundles]) assert ds.take() == [{str(i): i for i in range(num_cols1 + num_cols2)}] * n if should_invert: assert num_blocks == num_blocks2 From 3e4d9b0e1464d17ba2666294bded157b6e6b209c Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Mon, 8 Jul 2024 12:18:32 -0700 Subject: [PATCH 13/20] add deprecation warning Signed-off-by: Scott Lee --- python/ray/data/dataset.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index aa30a59b35f0..78160e4e4a75 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4633,9 +4633,10 @@ def get_internal_block_refs(self) -> List[ObjectRef[Block]]: Returns: A list of references to this dataset's blocks. """ - block_refs = self._plan.execute().block_refs - self._synchronize_progress_bar() - return block_refs + raise DeprecationWarning( + "`Dataset.get_internal_block_refs()` is deprecated. Use " + "`Dataset.iter_internal_ref_bundles()` instead.", + ) @DeveloperAPI def has_serializable_lineage(self) -> bool: From b6df226a3c8b9fc5d8d4e7d078b2638561442ae0 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Mon, 8 Jul 2024 14:58:38 -0700 Subject: [PATCH 14/20] snapshot metadata only Signed-off-by: Scott Lee --- .../data/_internal/execution/legacy_compat.py | 46 ++++++++++++++++++- python/ray/data/_internal/plan.py | 9 ++++ python/ray/data/tests/test_zip.py | 24 ++-------- 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/python/ray/data/_internal/execution/legacy_compat.py b/python/ray/data/_internal/execution/legacy_compat.py index b7d489efe659..d4b3261f82ca 100644 --- a/python/ray/data/_internal/execution/legacy_compat.py +++ b/python/ray/data/_internal/execution/legacy_compat.py @@ -3,7 +3,7 @@ It should be deleted once we fully move to the new executor backend. """ -from typing import Iterator, Tuple +from typing import Iterator, Optional, Tuple from ray.data._internal.block_list import BlockList from ray.data._internal.execution.interfaces import ( @@ -11,10 +11,12 @@ PhysicalOperator, RefBundle, ) +from ray.data._internal.execution.interfaces.executor import OutputIterator from ray.data._internal.logical.optimizers import get_execution_plan from ray.data._internal.logical.util import record_operators_usage from ray.data._internal.plan import ExecutionPlan from ray.data._internal.stats import DatasetStats +from ray.data._internal.util import unify_block_metadata_schema from ray.data.block import Block, BlockMetadata from ray.types import ObjectRef @@ -59,6 +61,48 @@ def execute_to_legacy_bundle_iterator( dag = dag_rewrite(dag) bundle_iter = executor.execute(dag, initial_stats=stats) + + class CacheMetadataIterator(OutputIterator): + """Wrapper for `bundle_iterator` above. + + For a given iterator which yields output RefBundles, + cache the metadata from each output bundle, and yield + the original RefBundle.""" + + def __init__(self, base_iterator: OutputIterator): + # Note: the base_iterator should be of type StreamIterator, + # defined within `StreamingExecutor.execute()`. It must + # support the `get_next()` method. + self._base_iterator = base_iterator + + def get_next(self, output_split_idx: Optional[int] = None) -> RefBundle: + bundle = self._base_iterator.get_next(output_split_idx) + self._cache_metadata(bundle) + return bundle + + def _cache_metadata(self, bundle: RefBundle) -> RefBundle: + """Cache the metadata from each output bundle, so we can + access important information, such as row count, schema, etc.""" + if not plan._snapshot_metadata: + # Initialize the snapshot BlockMetadata. + plan._snapshot_metadata = BlockMetadata( + num_rows=bundle.num_rows(), + size_bytes=bundle.size_bytes(), + schema=unify_block_metadata_schema(bundle.metadata), + input_files=None, + exec_stats=None, + ) + else: + # Update the snapshot BlockMetadata. + snap_md = plan._snapshot_metadata + snap_md.num_rows += bundle.num_rows() + snap_md.size_bytes += bundle.size_bytes() + snap_md.schema = unify_block_metadata_schema( + [snap_md, *bundle.metadata] + ) + return bundle + + bundle_iter = CacheMetadataIterator(bundle_iter) return bundle_iter diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index a46d938ee257..9b158b7d9009 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -72,6 +72,12 @@ def __init__( self._snapshot_operator: Optional[LogicalOperator] = None self._snapshot_stats = None self._snapshot_bundle = None + # Snapshot of only metadata corresponding to the final operator's + # output bundles, used as the source of truth for the Dataset's schema + # and count. This is calculated and cached when the plan is executed as an + # iterator (`execute_to_iterator()`), and avoids caching + # all of the output blocks in memory like in `self.snapshot_bundle`. + self._snapshot_metadata: Optional[BlockMetadata] = None # Cached schema. self._schema = None @@ -148,6 +154,9 @@ def generate_logical_plan_string( # This plan has executed some but not all operators. schema = unify_block_metadata_schema(self._snapshot_bundle.metadata) count = self._snapshot_bundle.num_rows() + elif self._snapshot_metadata is not None: + schema = self._snapshot_metadata.schema + count = self._snapshot_metadata.num_rows else: # This plan hasn't executed any operators. sources = self._logical_plan.sources() diff --git a/python/ray/data/tests/test_zip.py b/python/ray/data/tests/test_zip.py index 6c57afbb3a3b..2df33ef8bb0e 100644 --- a/python/ray/data/tests/test_zip.py +++ b/python/ray/data/tests/test_zip.py @@ -82,50 +82,34 @@ def test_zip_pandas(ray_start_regular_shared): ds2 = ray.data.from_pandas(pd.DataFrame({"col3": ["a", "b"], "col4": ["d", "e"]})) ds = ds1.zip(ds2) assert ds.count() == 2 - + assert "{col1: int64, col2: int64, col3: object, col4: object}" in str(ds) result = list(ds.take()) assert result[0] == {"col1": 1, "col2": 4, "col3": "a", "col4": "d"} - # Execute the dataset to get full schema. - ds = ds.materialize() - assert "{col1: int64, col2: int64, col3: object, col4: object}" in str(ds) - ds3 = ray.data.from_pandas(pd.DataFrame({"col2": ["a", "b"], "col4": ["d", "e"]})) ds = ds1.zip(ds3) assert ds.count() == 2 - + assert "{col1: int64, col2: int64, col2_1: object, col4: object}" in str(ds) result = list(ds.take()) assert result[0] == {"col1": 1, "col2": 4, "col2_1": "a", "col4": "d"} - # Execute the dataset to get full schema. - ds = ds.materialize() - assert "{col1: int64, col2: int64, col2_1: object, col4: object}" in str(ds) - def test_zip_arrow(ray_start_regular_shared): ds1 = ray.data.range(5).map(lambda r: {"id": r["id"]}) ds2 = ray.data.range(5).map(lambda r: {"a": r["id"] + 1, "b": r["id"] + 2}) ds = ds1.zip(ds2) assert ds.count() == 5 - + assert "{id: int64, a: int64, b: int64}" in str(ds) result = list(ds.take()) assert result[0] == {"id": 0, "a": 1, "b": 2} - # Execute the dataset to get full schema. - ds = ds.materialize() - assert "{id: int64, a: int64, b: int64}" in str(ds) - # Test duplicate column names. ds = ds1.zip(ds1).zip(ds1) assert ds.count() == 5 - + assert "{id: int64, id_1: int64, id_2: int64}" in str(ds) result = list(ds.take()) assert result[0] == {"id": 0, "id_1": 0, "id_2": 0} - # Execute the dataset to get full schema. - ds = ds.materialize() - assert "{id: int64, id_1: int64, id_2: int64}" in str(ds) - def test_zip_multiple_block_types(ray_start_regular_shared): df = pd.DataFrame({"spam": [0]}) From 70f82de93a6821430c9cd5f4685760616e9e2632 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Mon, 8 Jul 2024 15:06:34 -0700 Subject: [PATCH 15/20] clean up Signed-off-by: Scott Lee --- python/ray/data/dataset.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index c5380bb2fb16..eb5915e5f0f7 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4588,13 +4588,14 @@ def iter_internal_ref_bundles(self) -> Iterator[RefBundle]: An iterator over this Dataset's ``RefBundles``. """ - def _build_ref_bundle( - blocks: Tuple[ObjectRef[Block], BlockMetadata], - ) -> RefBundle: - return RefBundle((blocks,), owns_blocks=True) + def _build_ref_bundles( + iter_blocks: Iterator[Tuple[ObjectRef[Block], BlockMetadata]], + ) -> Iterator[RefBundle]: + for block in iter_blocks: + yield RefBundle((block,), owns_blocks=True) iter_block_refs_md, _, _ = self._plan.execute_to_iterator() - iter_ref_bundles = map(_build_ref_bundle, iter_block_refs_md) + iter_ref_bundles = _build_ref_bundles(iter_block_refs_md) self._synchronize_progress_bar() return iter_ref_bundles From f520c62f460e821157afb89198699e1825dfbcab Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Mon, 8 Jul 2024 16:11:58 -0700 Subject: [PATCH 16/20] update parquet test Signed-off-by: Scott Lee --- python/ray/data/tests/test_parquet.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py index 9e92e2536b12..8266e56a3325 100644 --- a/python/ray/data/tests/test_parquet.py +++ b/python/ray/data/tests/test_parquet.py @@ -329,10 +329,11 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path): assert "test2.parquet" in str(input_files) assert not ds._plan.has_started_execution - # Schema isn't available, so we do a partial read. + # Dataset.schema() calls execute_to_iterator(), which caches the metadata. + # This means the schema and num_rows are available once `ds.schema()` is called. assert ds.schema() is not None - assert str(ds) == "Dataset(num_rows=?, schema={one: int64, two: string})", ds - assert repr(ds) == "Dataset(num_rows=?, schema={one: int64, two: string})", ds + assert str(ds) == "Dataset(num_rows=3, schema={one: int64, two: string})", ds + assert repr(ds) == "Dataset(num_rows=3, schema={one: int64, two: string})", ds assert ds._plan.has_started_execution assert not ds._plan.has_computed_output() From 629d6bb194ce1a097c6ff2352020b8f34b505f03 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Mon, 8 Jul 2024 18:19:13 -0700 Subject: [PATCH 17/20] only cache metadata once iteration terminates Signed-off-by: Scott Lee --- .../data/_internal/execution/legacy_compat.py | 55 ++++++++++--------- python/ray/data/tests/test_consumption.py | 13 +++++ python/ray/data/tests/test_parquet.py | 7 +-- 3 files changed, 45 insertions(+), 30 deletions(-) diff --git a/python/ray/data/_internal/execution/legacy_compat.py b/python/ray/data/_internal/execution/legacy_compat.py index d4b3261f82ca..0fccf070d95d 100644 --- a/python/ray/data/_internal/execution/legacy_compat.py +++ b/python/ray/data/_internal/execution/legacy_compat.py @@ -66,40 +66,43 @@ class CacheMetadataIterator(OutputIterator): """Wrapper for `bundle_iterator` above. For a given iterator which yields output RefBundles, - cache the metadata from each output bundle, and yield - the original RefBundle.""" + collect the metadata from each output bundle, and yield the + original RefBundle. Only after the entire iterator is exhausted, + we cache the resulting metadata to the execution plan.""" def __init__(self, base_iterator: OutputIterator): # Note: the base_iterator should be of type StreamIterator, # defined within `StreamingExecutor.execute()`. It must # support the `get_next()` method. self._base_iterator = base_iterator + self._collected_metadata = BlockMetadata( + num_rows=0, + size_bytes=0, + schema=None, + input_files=None, + exec_stats=None, + ) def get_next(self, output_split_idx: Optional[int] = None) -> RefBundle: - bundle = self._base_iterator.get_next(output_split_idx) - self._cache_metadata(bundle) - return bundle - - def _cache_metadata(self, bundle: RefBundle) -> RefBundle: - """Cache the metadata from each output bundle, so we can - access important information, such as row count, schema, etc.""" - if not plan._snapshot_metadata: - # Initialize the snapshot BlockMetadata. - plan._snapshot_metadata = BlockMetadata( - num_rows=bundle.num_rows(), - size_bytes=bundle.size_bytes(), - schema=unify_block_metadata_schema(bundle.metadata), - input_files=None, - exec_stats=None, - ) - else: - # Update the snapshot BlockMetadata. - snap_md = plan._snapshot_metadata - snap_md.num_rows += bundle.num_rows() - snap_md.size_bytes += bundle.size_bytes() - snap_md.schema = unify_block_metadata_schema( - [snap_md, *bundle.metadata] - ) + try: + bundle = self._base_iterator.get_next(output_split_idx) + self._collect_metadata(bundle) + return bundle + except StopIteration: + # Once the iterator is completely exhausted, we are done + # collecting metadata. We can add this cached metadata to the plan. + plan._snapshot_metadata = self._collected_metadata + raise + + def _collect_metadata(self, bundle: RefBundle) -> RefBundle: + """Collect the metadata from each output bundle and accumulate + results, so we can access important information, such as + row count, schema, etc., after iteration completes.""" + self._collected_metadata.num_rows += bundle.num_rows() + self._collected_metadata.size_bytes += bundle.size_bytes() + self._collected_metadata.schema = unify_block_metadata_schema( + [self._collected_metadata, *bundle.metadata] + ) return bundle bundle_iter = CacheMetadataIterator(bundle_iter) diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py index cc806d905263..2bc6760553bd 100644 --- a/python/ray/data/tests/test_consumption.py +++ b/python/ray/data/tests/test_consumption.py @@ -158,6 +158,19 @@ def test_count_edge_case(ray_start_regular): assert actual_count == 5 +def test_count_after_partial_execution(ray_start_regular): + paths = ["example://iris.csv"] * 5 + ds = ray.data.read_csv(paths, override_num_blocks=15) + for batch in ds.iter_batches(batch_size=1): + # Take one batch and break to simulate partial iteration/execution. + break + # Row count should be unknown after partial execution. + assert "num_rows=?" in str(ds) + # After calling `ds.count()`, row count should be known. + assert ds.count() == 150 * 5 + assert f"num_rows={150*5}" in str(ds) + + def test_limit_execution(ray_start_regular): last_snapshot = get_initial_core_execution_metrics_snapshot() override_num_blocks = 20 diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py index 8266e56a3325..9e92e2536b12 100644 --- a/python/ray/data/tests/test_parquet.py +++ b/python/ray/data/tests/test_parquet.py @@ -329,11 +329,10 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path): assert "test2.parquet" in str(input_files) assert not ds._plan.has_started_execution - # Dataset.schema() calls execute_to_iterator(), which caches the metadata. - # This means the schema and num_rows are available once `ds.schema()` is called. + # Schema isn't available, so we do a partial read. assert ds.schema() is not None - assert str(ds) == "Dataset(num_rows=3, schema={one: int64, two: string})", ds - assert repr(ds) == "Dataset(num_rows=3, schema={one: int64, two: string})", ds + assert str(ds) == "Dataset(num_rows=?, schema={one: int64, two: string})", ds + assert repr(ds) == "Dataset(num_rows=?, schema={one: int64, two: string})", ds assert ds._plan.has_started_execution assert not ds._plan.has_computed_output() From 7c2710a27724fa9601994b1240294991a466ad7b Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Mon, 8 Jul 2024 18:22:20 -0700 Subject: [PATCH 18/20] log deprecation warning Signed-off-by: Scott Lee --- python/ray/data/dataset.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index d073966046dc..aaf4e66c3b62 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4634,10 +4634,13 @@ def get_internal_block_refs(self) -> List[ObjectRef[Block]]: Returns: A list of references to this dataset's blocks. """ - raise DeprecationWarning( + logger.warning( "`Dataset.get_internal_block_refs()` is deprecated. Use " "`Dataset.iter_internal_ref_bundles()` instead.", ) + block_refs = self._plan.execute().block_refs + self._synchronize_progress_bar() + return block_refs @DeveloperAPI def has_serializable_lineage(self) -> bool: From a6e99c2390b42aadcb846186f4076a10e3d2b69a Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 10 Jul 2024 12:34:02 -0700 Subject: [PATCH 19/20] clean up Signed-off-by: Scott Lee --- python/ray/data/dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index aaf4e66c3b62..5ae215b5afb8 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4169,7 +4169,7 @@ def to_dask( dask.config.set(scheduler=ray_dask_get) @dask.delayed - def block_to_df(block_ref: Block) -> pd.DataFrame: + def block_to_df(block_ref: ObjectRef[Block]) -> pd.DataFrame: if isinstance(block_ref, (ray.ObjectRef, ClientObjectRef)): raise ValueError( "Dataset.to_dask() must be used with Dask-on-Ray, please " @@ -4619,6 +4619,7 @@ def _build_ref_bundles( return iter_ref_bundles @Deprecated + @ConsumptionAPI(pattern="Examples:") def get_internal_block_refs(self) -> List[ObjectRef[Block]]: """Get a list of references to the underlying blocks of this dataset. From fc0a33459497980b2b9f9d45e246c50ea642721f Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Thu, 11 Jul 2024 13:26:50 -0700 Subject: [PATCH 20/20] comments Signed-off-by: Scott Lee --- .../_internal/execution/interfaces/ref_bundle.py | 2 +- python/ray/data/tests/test_size_estimation.py | 14 +++++++------- python/ray/data/tests/test_zip.py | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/ref_bundle.py b/python/ray/data/_internal/execution/interfaces/ref_bundle.py index bfbbea3586db..758b22215051 100644 --- a/python/ray/data/_internal/execution/interfaces/ref_bundle.py +++ b/python/ray/data/_internal/execution/interfaces/ref_bundle.py @@ -130,7 +130,7 @@ def __len__(self) -> int: def _ref_bundles_iterator_to_block_refs_list( ref_bundles: Iterator[RefBundle], ) -> List[ObjectRef[Block]]: - """Convert an iterator of RefBundles to a list of object references to Blocks.""" + """Convert an iterator of RefBundles to a list of Block object references.""" return [ block_ref for ref_bundle in ref_bundles for block_ref in ref_bundle.block_refs ] diff --git a/python/ray/data/tests/test_size_estimation.py b/python/ray/data/tests/test_size_estimation.py index 667473751f57..7a18b37e3c78 100644 --- a/python/ray/data/tests/test_size_estimation.py +++ b/python/ray/data/tests/test_size_estimation.py @@ -192,13 +192,13 @@ def __call__(self, x): bundles: Iterable[RefBundle] = ds2.map( identity_fn, **kwargs ).iter_internal_ref_bundles() - nblocks = sum([len(b.block_refs) for b in bundles]) + nblocks = sum(len(b.block_refs) for b in bundles) assert nblocks == 1, nblocks ctx.target_max_block_size = 2_000_000 bundles: Iterable[RefBundle] = ds2.map( identity_fn, **kwargs ).iter_internal_ref_bundles() - nblocks = sum([len(b.block_refs) for b in bundles]) + nblocks = sum(len(b.block_refs) for b in bundles) assert 4 < nblocks < 7 or use_actors, nblocks # Disabled. @@ -206,7 +206,7 @@ def __call__(self, x): ctx.target_max_block_size = 2**64 ds3 = ray.data.range(1000, override_num_blocks=1).map(arrow_fn, **kwargs) bundles = ds3.map(identity_fn, **kwargs).iter_internal_ref_bundles() - nblocks = sum([len(b.block_refs) for b in bundles]) + nblocks = sum(len(b.block_refs) for b in bundles) assert nblocks == 1, nblocks @@ -217,11 +217,11 @@ def test_split_flat_map(ray_start_regular_shared): ctx.target_max_block_size = 20_000_000 ds2 = ray.data.range(1000, override_num_blocks=1).map(lambda _: ARROW_LARGE_VALUE) bundles = ds2.flat_map(lambda x: [x]).iter_internal_ref_bundles() - nblocks = sum([len(b.block_refs) for b in bundles]) + nblocks = sum(len(b.block_refs) for b in bundles) assert nblocks == 1, nblocks ctx.target_max_block_size = 2_000_000 bundles = ds2.flat_map(lambda x: [x]).iter_internal_ref_bundles() - nblocks = sum([len(b.block_refs) for b in bundles]) + nblocks = sum(len(b.block_refs) for b in bundles) assert 4 < nblocks < 7, nblocks @@ -232,11 +232,11 @@ def test_split_map_batches(ray_start_regular_shared): ctx.target_max_block_size = 20_000_000 ds2 = ray.data.range(1000, override_num_blocks=1).map(lambda _: ARROW_LARGE_VALUE) bundles = ds2.map_batches(lambda x: x, batch_size=1).iter_internal_ref_bundles() - nblocks = sum([len(b.block_refs) for b in bundles]) + nblocks = sum(len(b.block_refs) for b in bundles) assert nblocks == 1, nblocks ctx.target_max_block_size = 2_000_000 bundles = ds2.map_batches(lambda x: x, batch_size=16).iter_internal_ref_bundles() - nblocks = sum([len(b.block_refs) for b in bundles]) + nblocks = sum(len(b.block_refs) for b in bundles) assert 4 < nblocks < 7, nblocks diff --git a/python/ray/data/tests/test_zip.py b/python/ray/data/tests/test_zip.py index d7550b6f31d6..85be862d0012 100644 --- a/python/ray/data/tests/test_zip.py +++ b/python/ray/data/tests/test_zip.py @@ -70,7 +70,7 @@ def test_zip_different_num_blocks_split_smallest( ) ds = ds1.zip(ds2).materialize() bundles = ds.iter_internal_ref_bundles() - num_blocks = sum([len(b.block_refs) for b in bundles]) + num_blocks = sum(len(b.block_refs) for b in bundles) assert ds.take() == [{str(i): i for i in range(num_cols1 + num_cols2)}] * n if should_invert: assert num_blocks == num_blocks2