Skip to content

Commit

Permalink
[Data] Remove in_blocks parameter of ExecutionPlan (ray-project#4…
Browse files Browse the repository at this point in the history
…5860)

These changes are part of a larger effort to remove LazyBlockList.

Currently, when you create an ExecutionPlan, you need to pass in a BlockList. A BlockList can either be a regular BlockList which represents in-memory data or a LazyBlockList which represents ReadTasks. The issue is that there's no way to represent lazy input data that doesn't use ReadTasks.

To address this, this PR removes the in_blocks parameter of ExecutionPlan. It follows ray-project#45852.

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: Scott Lee <scottjlee@users.noreply.github.com>
  • Loading branch information
bveeramani and scottjlee authored Jun 13, 2024
1 parent dddb1c2 commit 384f46c
Show file tree
Hide file tree
Showing 16 changed files with 188 additions and 339 deletions.
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -594,3 +594,11 @@ py_test(
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_logical_plan",
size = "small",
srcs = ["tests/test_logical_plan.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)
7 changes: 3 additions & 4 deletions python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,15 @@ def _get_execution_dag(


def _get_initial_stats_from_plan(plan: ExecutionPlan) -> DatasetStats:
if plan._snapshot_blocks is not None and not plan._snapshot_blocks.is_cleared():
if plan._snapshot_bundle is not None:
return plan._snapshot_stats
# For Datasets created from "read_xxx", `plan._in_blocks` is a LazyBlockList,
# and `plan._in_stats` contains useless data.
# For Datasets created from "read_xxx", `plan._in_stats` contains useless data.
# For Datasets created from "from_xxx", we need to use `plan._in_stats` as
# the initial stats. Because the `FromXxx` logical operators will be translated to
# "InputDataBuffer" physical operators, which will be ignored when generating
# stats, see `StreamingExecutor._generate_stats`.
# TODO(hchen): Unify the logic by saving the initial stats in `InputDataBuffer
if isinstance(plan._in_blocks, LazyBlockList):
if plan.has_lazy_input():
return DatasetStats(metadata={}, parent=None)
else:
return plan._in_stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
if TYPE_CHECKING:
import pyarrow

from ray.data._internal.execution.interfaces import RefBundle


class LogicalOperator(Operator):
"""Abstract class for logical operators.
Expand Down Expand Up @@ -74,3 +76,7 @@ def num_rows(self) -> Optional[int]:
def input_files(self) -> Optional[List[str]]:
"""The input files of this operator, or ``None`` if not known."""
return None

def output_data(self) -> Optional[List["RefBundle"]]:
"""The output data of this operator, or ``None`` if not known."""
return None
13 changes: 13 additions & 0 deletions python/ray/data/_internal/logical/interfaces/logical_plan.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import List

from .logical_operator import LogicalOperator
from .plan import Plan

Expand All @@ -12,3 +14,14 @@ def __init__(self, dag: LogicalOperator):
def dag(self) -> LogicalOperator:
"""Get the DAG of logical operators."""
return self._dag

def sources(self) -> List[LogicalOperator]:
"""List of operators that are sources for this plan's DAG."""
# If an operator has no input dependencies, it's a source.
if not any(self._dag.input_dependencies):
return [self._dag]

sources = []
for op in self._dag.input_dependencies:
sources.extend(LogicalPlan(op).sources())
return sources
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import abc
from typing import TYPE_CHECKING, List, Union
from typing import TYPE_CHECKING, List, Optional, Union

from ray.data._internal.execution.interfaces import RefBundle
from ray.data._internal.logical.interfaces import LogicalOperator
Expand Down Expand Up @@ -46,6 +46,9 @@ def num_rows(self):
else:
return None

def output_data(self) -> Optional[List[RefBundle]]:
return self._input_data


class FromItems(AbstractFrom):
"""Logical operator for `from_items`."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,8 @@ def num_rows(self):
return sum(bundle.num_rows() for bundle in self.input_data)
else:
return None

def output_data(self) -> Optional[List[RefBundle]]:
if self.input_data is None:
return None
return self.input_data
Loading

0 comments on commit 384f46c

Please sign in to comment.