Skip to content

Commit

Permalink
[Data] Remove LazyBlockList (#46054)
Browse files Browse the repository at this point in the history
After #45860, LazyBlockList is dead code.

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
  • Loading branch information
bveeramani authored Jun 17, 2024
1 parent 367dcb6 commit ab85dd2
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 528 deletions.
7 changes: 2 additions & 5 deletions python/ray/data/_internal/block_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
class BlockList:
"""A list of blocks that may be computed or pending computation.
In the basic version of BlockList, all blocks are known ahead of time. In
LazyBlockList, blocks are not yet computed, so the number of blocks may
change after execution due to block splitting.
All blocks are known ahead of time
"""

def __init__(
Expand Down Expand Up @@ -69,7 +67,6 @@ def get_blocks(self) -> List[ObjectRef[Block]]:
The length of this iterator is not known until execution.
"""
self._check_if_cleared()
# Overriden in LazyBlockList for bulk evaluation.
return list(self._blocks)

def get_blocks_with_metadata(self) -> List[Tuple[ObjectRef[Block], BlockMetadata]]:
Expand All @@ -78,7 +75,7 @@ def get_blocks_with_metadata(self) -> List[Tuple[ObjectRef[Block], BlockMetadata
Prefer calling this instead of the iter form for performance if you
don't need lazy evaluation.
"""
self.get_blocks() # Force bulk evaluation in LazyBlockList.
self.get_blocks()
return list(self.iter_blocks_with_metadata())

def iter_blocks_with_metadata(
Expand Down
59 changes: 0 additions & 59 deletions python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,11 @@
PhysicalOperator,
RefBundle,
)
from ray.data._internal.lazy_block_list import LazyBlockList
from ray.data._internal.logical.interfaces.logical_plan import LogicalPlan
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.optimizers import get_execution_plan
from ray.data._internal.logical.rules.set_read_parallelism import (
compute_additional_split_factor,
)
from ray.data._internal.logical.util import record_operators_usage
from ray.data._internal.plan import ExecutionPlan
from ray.data._internal.planner.plan_read_op import (
apply_output_blocks_handling_to_read_task,
)
from ray.data._internal.stats import DatasetStats
from ray.data.block import Block, BlockMetadata, List
from ray.data.context import DataContext
from ray.types import ObjectRef

# Warn about tasks larger than this.
Expand Down Expand Up @@ -111,55 +101,6 @@ def execute_to_legacy_block_list(
return block_list


def get_legacy_lazy_block_list_read_only(
plan: ExecutionPlan,
) -> LazyBlockList:
"""For a read-only plan, construct a LazyBlockList with ReadTasks from the
input Datasource or Reader. Note that the plan and the underlying ReadTasks
are not executed, only their known metadata is fetched.
Args:
plan: The legacy plan to execute.
Returns:
The output as a legacy LazyBlockList.
"""
assert plan.is_read_only(), "This function only supports read-only plans."
assert isinstance(plan._logical_plan, LogicalPlan)
read_logical_op = plan._logical_plan.dag
assert isinstance(read_logical_op, Read)

# In the full dataset execution, the logic in ApplyAdditionalSplitToOutputBlocks
# is normally executed as part of the MapOperator created in the
# LogicalPlan -> PhysicalPlan plan translation. In this case, since we
# get the ReadTasks directly from the Datasource or Reader,
# we need to manually apply this logic in order to update the ReadTasks.
ctx = DataContext.get_current()
(parallelism, _, estimated_num_blocks, k,) = compute_additional_split_factor(
read_logical_op._datasource_or_legacy_reader,
read_logical_op._parallelism,
read_logical_op._mem_size,
ctx.target_max_block_size,
cur_additional_split_factor=None,
)
read_tasks = read_logical_op._datasource_or_legacy_reader.get_read_tasks(
parallelism
)
for read_task in read_tasks:
apply_output_blocks_handling_to_read_task(read_task, k)

block_list = LazyBlockList(
read_tasks,
read_logical_op.name,
ray_remote_args=read_logical_op._ray_remote_args,
owned_by_consumer=False,
)
# Update the estimated number of blocks after applying optimizations
# and fetching metadata (e.g. SetReadParallelismRule).
block_list._estimated_num_blocks = estimated_num_blocks
return block_list


def _get_execution_dag(
executor: Executor,
plan: ExecutionPlan,
Expand Down
Loading

0 comments on commit ab85dd2

Please sign in to comment.