Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Remove LazyBlockList #46054

Merged
merged 2 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions python/ray/data/_internal/block_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
class BlockList:
"""A list of blocks that may be computed or pending computation.

In the basic version of BlockList, all blocks are known ahead of time. In
LazyBlockList, blocks are not yet computed, so the number of blocks may
change after execution due to block splitting.
All blocks are known ahead of time
"""

def __init__(
Expand Down Expand Up @@ -69,7 +67,6 @@ def get_blocks(self) -> List[ObjectRef[Block]]:
The length of this iterator is not known until execution.
"""
self._check_if_cleared()
# Overriden in LazyBlockList for bulk evaluation.
return list(self._blocks)

def get_blocks_with_metadata(self) -> List[Tuple[ObjectRef[Block], BlockMetadata]]:
Expand All @@ -78,7 +75,7 @@ def get_blocks_with_metadata(self) -> List[Tuple[ObjectRef[Block], BlockMetadata
Prefer calling this instead of the iter form for performance if you
don't need lazy evaluation.
"""
self.get_blocks() # Force bulk evaluation in LazyBlockList.
self.get_blocks()
return list(self.iter_blocks_with_metadata())

def iter_blocks_with_metadata(
Expand Down
59 changes: 0 additions & 59 deletions python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,11 @@
PhysicalOperator,
RefBundle,
)
from ray.data._internal.lazy_block_list import LazyBlockList
from ray.data._internal.logical.interfaces.logical_plan import LogicalPlan
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.optimizers import get_execution_plan
from ray.data._internal.logical.rules.set_read_parallelism import (
compute_additional_split_factor,
)
from ray.data._internal.logical.util import record_operators_usage
from ray.data._internal.plan import ExecutionPlan
from ray.data._internal.planner.plan_read_op import (
apply_output_blocks_handling_to_read_task,
)
from ray.data._internal.stats import DatasetStats
from ray.data.block import Block, BlockMetadata, List
from ray.data.context import DataContext
from ray.types import ObjectRef

# Warn about tasks larger than this.
Expand Down Expand Up @@ -111,55 +101,6 @@ def execute_to_legacy_block_list(
return block_list


def get_legacy_lazy_block_list_read_only(
plan: ExecutionPlan,
) -> LazyBlockList:
"""For a read-only plan, construct a LazyBlockList with ReadTasks from the
input Datasource or Reader. Note that the plan and the underlying ReadTasks
are not executed, only their known metadata is fetched.

Args:
plan: The legacy plan to execute.

Returns:
The output as a legacy LazyBlockList.
"""
assert plan.is_read_only(), "This function only supports read-only plans."
assert isinstance(plan._logical_plan, LogicalPlan)
read_logical_op = plan._logical_plan.dag
assert isinstance(read_logical_op, Read)

# In the full dataset execution, the logic in ApplyAdditionalSplitToOutputBlocks
# is normally executed as part of the MapOperator created in the
# LogicalPlan -> PhysicalPlan plan translation. In this case, since we
# get the ReadTasks directly from the Datasource or Reader,
# we need to manually apply this logic in order to update the ReadTasks.
ctx = DataContext.get_current()
(parallelism, _, estimated_num_blocks, k,) = compute_additional_split_factor(
read_logical_op._datasource_or_legacy_reader,
read_logical_op._parallelism,
read_logical_op._mem_size,
ctx.target_max_block_size,
cur_additional_split_factor=None,
)
read_tasks = read_logical_op._datasource_or_legacy_reader.get_read_tasks(
parallelism
)
for read_task in read_tasks:
apply_output_blocks_handling_to_read_task(read_task, k)

block_list = LazyBlockList(
read_tasks,
read_logical_op.name,
ray_remote_args=read_logical_op._ray_remote_args,
owned_by_consumer=False,
)
# Update the estimated number of blocks after applying optimizations
# and fetching metadata (e.g. SetReadParallelismRule).
block_list._estimated_num_blocks = estimated_num_blocks
return block_list


def _get_execution_dag(
executor: Executor,
plan: ExecutionPlan,
Expand Down
Loading
Loading