Skip to content

Commit

Permalink
[Data] Remove run_by_consumer parameter of ExecutionPlan (ray-pro…
Browse files Browse the repository at this point in the history
…ject#46052)

This PR removes dead functions and parameters after ray-project#46054.

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
  • Loading branch information
bveeramani authored and chungen04 committed Jun 18, 2024
1 parent 571125f commit f2656ca
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 103 deletions.
31 changes: 2 additions & 29 deletions python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ray.data._internal.logical.util import record_operators_usage
from ray.data._internal.plan import ExecutionPlan
from ray.data._internal.stats import DatasetStats
from ray.data.block import Block, BlockMetadata, List
from ray.data.block import Block, BlockMetadata
from ray.types import ObjectRef

# Warn about tasks larger than this.
Expand All @@ -25,13 +25,9 @@
def execute_to_legacy_block_iterator(
executor: Executor,
plan: ExecutionPlan,
allow_clear_input_blocks: bool,
dataset_uuid: str,
) -> Iterator[Tuple[ObjectRef[Block], BlockMetadata]]:
"""Same as execute_to_legacy_bundle_iterator but returning blocks and metadata."""
bundle_iter = execute_to_legacy_bundle_iterator(
executor, plan, allow_clear_input_blocks, dataset_uuid
)
bundle_iter = execute_to_legacy_bundle_iterator(executor, plan)
for bundle in bundle_iter:
for block, metadata in bundle.blocks:
yield block, metadata
Expand All @@ -40,17 +36,13 @@ def execute_to_legacy_block_iterator(
def execute_to_legacy_bundle_iterator(
executor: Executor,
plan: ExecutionPlan,
allow_clear_input_blocks: bool,
dataset_uuid: str,
dag_rewrite=None,
) -> Iterator[RefBundle]:
"""Execute a plan with the new executor and return a bundle iterator.
Args:
executor: The executor to use.
plan: The legacy plan to execute.
allow_clear_input_blocks: Whether the executor may consider clearing blocks.
dataset_uuid: UUID of the dataset for this execution.
dag_rewrite: Callback that can be used to mutate the DAG prior to execution.
This is currently used as a legacy hack to inject the OutputSplit operator
for `Dataset.streaming_split()`.
Expand All @@ -73,7 +65,6 @@ def execute_to_legacy_bundle_iterator(
def execute_to_legacy_block_list(
executor: Executor,
plan: ExecutionPlan,
allow_clear_input_blocks: bool,
dataset_uuid: str,
preserve_order: bool,
) -> BlockList:
Expand All @@ -82,7 +73,6 @@ def execute_to_legacy_block_list(
Args:
executor: The executor to use.
plan: The legacy plan to execute.
allow_clear_input_blocks: Whether the executor may consider clearing blocks.
dataset_uuid: UUID of the dataset for this execution.
preserve_order: Whether to preserve order in execution.
Expand Down Expand Up @@ -149,23 +139,6 @@ def _bundles_to_block_list(bundles: Iterator[RefBundle]) -> BlockList:
return BlockList(blocks, metadata, owned_by_consumer=owns_blocks)


def _block_list_to_bundles(blocks: BlockList, owns_blocks: bool) -> List[RefBundle]:
output = []
for block, meta in blocks.iter_blocks_with_metadata():
output.append(
RefBundle(
[
(
block,
meta,
)
],
owns_blocks=owns_blocks,
)
)
return output


def _set_stats_uuid_recursive(stats: DatasetStats, dataset_uuid: str) -> None:
if not stats.dataset_uuid:
stats.dataset_uuid = dataset_uuid
Expand Down
2 changes: 0 additions & 2 deletions python/ray/data/_internal/iterator/stream_split_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,6 @@ def add_split_op(dag):
output_iterator = execute_to_legacy_bundle_iterator(
executor,
dataset._plan,
True,
dataset._plan._dataset_uuid,
dag_rewrite=add_split_op,
)
yield output_iterator
Expand Down
28 changes: 3 additions & 25 deletions python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,13 @@ def __init__(
self,
stats: DatasetStats,
*,
run_by_consumer: bool,
data_context: Optional[DataContext] = None,
):
"""Create a plan with no transformation operators.
Args:
stats: Stats for the base blocks.
dataset_uuid: Dataset's UUID.
run_by_consumer: Whether this plan is invoked to run by the consumption
APIs (e.g. .iter_batches()).
"""
self._in_stats = stats
# A computed snapshot of some prefix of operators and their corresponding
Expand All @@ -81,7 +78,6 @@ def __init__(
# Set when a Dataset is constructed with this plan
self._dataset_uuid = None

self._run_by_consumer = run_by_consumer
self._dataset_name = None

self._has_started_execution = False
Expand All @@ -97,7 +93,6 @@ def __repr__(self) -> str:
return (
f"ExecutionPlan("
f"dataset_uuid={self._dataset_uuid}, "
f"run_by_consumer={self._run_by_consumer}, "
f"snapshot_operator={self._snapshot_operator}"
)

Expand Down Expand Up @@ -163,9 +158,7 @@ def generate_logical_plan_string(
count = None
else:
assert len(sources) == 1
plan = ExecutionPlan(
DatasetStats(metadata={}, parent=None), run_by_consumer=False
)
plan = ExecutionPlan(DatasetStats(metadata={}, parent=None))
plan.link_logical_plan(LogicalPlan(sources[0]))
schema = plan.schema()
count = plan.meta_count()
Expand Down Expand Up @@ -292,7 +285,6 @@ def copy(self) -> "ExecutionPlan":
"""
plan_copy = ExecutionPlan(
self._in_stats,
run_by_consumer=self._run_by_consumer,
data_context=self._context,
)
if self._snapshot_bundle is not None:
Expand All @@ -311,10 +303,7 @@ def deep_copy(self) -> "ExecutionPlan":
Returns:
A deep copy of this execution plan.
"""
plan_copy = ExecutionPlan(
copy.copy(self._in_stats),
run_by_consumer=self._run_by_consumer,
)
plan_copy = ExecutionPlan(copy.copy(self._in_stats))
if self._snapshot_bundle:
# Copy over the existing snapshot.
plan_copy._snapshot_bundle = copy.copy(self._snapshot_bundle)
Expand Down Expand Up @@ -397,7 +386,6 @@ def meta_count(self) -> Optional[int]:
@omit_traceback_stdout
def execute_to_iterator(
self,
allow_clear_input_blocks: bool = True,
) -> Tuple[
Iterator[Tuple[ObjectRef[Block], BlockMetadata]],
DatasetStats,
Expand All @@ -407,10 +395,6 @@ def execute_to_iterator(
This will use streaming execution to generate outputs.
Args:
allow_clear_input_blocks: Whether we should try to clear the input blocks
for each operator.
Returns:
Tuple of iterator over output blocks and the executor.
"""
Expand All @@ -420,7 +404,7 @@ def execute_to_iterator(
ctx = self._context

if self.has_computed_output():
bundle = self.execute(allow_clear_input_blocks)
bundle = self.execute()
return iter(bundle.blocks), self._snapshot_stats, None

from ray.data._internal.execution.legacy_compat import (
Expand All @@ -433,8 +417,6 @@ def execute_to_iterator(
block_iter = execute_to_legacy_block_iterator(
executor,
self,
allow_clear_input_blocks=allow_clear_input_blocks,
dataset_uuid=self._dataset_uuid,
)
# Since the generator doesn't run any code until we try to fetch the first
# value, force execution of one bundle before we call get_stats().
Expand All @@ -449,14 +431,11 @@ def execute_to_iterator(
@omit_traceback_stdout
def execute(
self,
allow_clear_input_blocks: bool = True,
preserve_order: bool = False,
) -> RefBundle:
"""Execute this plan.
Args:
allow_clear_input_blocks: Whether we should try to clear the input blocks
for each operator.
preserve_order: Whether to preserve order in execution.
Returns:
Expand Down Expand Up @@ -515,7 +494,6 @@ def execute(
blocks = execute_to_legacy_block_list(
executor,
self,
allow_clear_input_blocks=allow_clear_input_blocks,
dataset_uuid=self._dataset_uuid,
preserve_order=preserve_order,
)
Expand Down
22 changes: 5 additions & 17 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1403,10 +1403,7 @@ def train(self, data_iterator):
logical_plan = LogicalPlan(InputData(input_data=ref_bundles))
split_datasets.append(
MaterializedDataset(
ExecutionPlan(
stats,
run_by_consumer=owned_by_consumer,
),
ExecutionPlan(stats),
logical_plan,
)
)
Expand Down Expand Up @@ -1524,10 +1521,7 @@ def build_node_id_by_actor(actors: List[Any]) -> Dict[Any, str]:
logical_plan = LogicalPlan(InputData(input_data=[bundle]))
split_datasets.append(
MaterializedDataset(
ExecutionPlan(
stats,
run_by_consumer=owned_by_consumer,
),
ExecutionPlan(stats),
logical_plan,
)
)
Expand Down Expand Up @@ -1601,10 +1595,7 @@ def split_at_indices(self, indices: List[int]) -> List["MaterializedDataset"]:

splits.append(
MaterializedDataset(
ExecutionPlan(
stats,
run_by_consumer=bundle.owns_blocks,
),
ExecutionPlan(stats),
logical_plan,
)
)
Expand Down Expand Up @@ -1793,7 +1784,7 @@ def union(self, *other: List["Dataset"]) -> "Dataset":
)
stats.time_total_s = time.perf_counter() - start_time
return Dataset(
ExecutionPlan(stats, run_by_consumer=False),
ExecutionPlan(stats),
logical_plan,
)

Expand Down Expand Up @@ -4526,10 +4517,7 @@ def materialize(self) -> "MaterializedDataset":
]
logical_plan = LogicalPlan(InputData(input_data=ref_bundles))
output = MaterializedDataset(
ExecutionPlan(
copy._plan.stats(),
run_by_consumer=False,
),
ExecutionPlan(copy._plan.stats()),
logical_plan,
)
# Metrics are tagged with `copy`s uuid, update the output uuid with
Expand Down
5 changes: 1 addition & 4 deletions python/ray/data/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,10 +875,7 @@ def materialize(self) -> "MaterializedDataset":
]
logical_plan = LogicalPlan(InputData(input_data=ref_bundles))
return MaterializedDataset(
ExecutionPlan(
stats,
run_by_consumer=owned_by_consumer,
),
ExecutionPlan(stats),
logical_plan,
)

Expand Down
32 changes: 7 additions & 25 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,7 @@ def from_blocks(blocks: List[Block]):
from_blocks_op = FromBlocks(block_refs, metadata)
logical_plan = LogicalPlan(from_blocks_op)
return MaterializedDataset(
ExecutionPlan(
DatasetStats(metadata={"FromBlocks": metadata}, parent=None),
run_by_consumer=False,
),
ExecutionPlan(DatasetStats(metadata={"FromBlocks": metadata}, parent=None)),
logical_plan,
)

Expand Down Expand Up @@ -207,10 +204,7 @@ def from_items(
from_items_op = FromItems(blocks, metadata)
logical_plan = LogicalPlan(from_items_op)
return MaterializedDataset(
ExecutionPlan(
DatasetStats(metadata={"FromItems": metadata}, parent=None),
run_by_consumer=False,
),
ExecutionPlan(DatasetStats(metadata={"FromItems": metadata}, parent=None)),
logical_plan,
)

Expand Down Expand Up @@ -441,7 +435,7 @@ def read_datasource(
)
logical_plan = LogicalPlan(read_op)
return Dataset(
plan=ExecutionPlan(stats, run_by_consumer=False),
plan=ExecutionPlan(stats),
logical_plan=logical_plan,
)

Expand Down Expand Up @@ -2481,10 +2475,7 @@ def from_pandas_refs(
metadata = ray.get([get_metadata.remote(df) for df in dfs])
logical_plan = LogicalPlan(FromPandas(dfs, metadata))
return MaterializedDataset(
ExecutionPlan(
DatasetStats(metadata={"FromPandas": metadata}, parent=None),
run_by_consumer=False,
),
ExecutionPlan(DatasetStats(metadata={"FromPandas": metadata}, parent=None)),
logical_plan,
)

Expand All @@ -2495,10 +2486,7 @@ def from_pandas_refs(
metadata = ray.get(metadata)
logical_plan = LogicalPlan(FromPandas(blocks, metadata))
return MaterializedDataset(
ExecutionPlan(
DatasetStats(metadata={"FromPandas": metadata}, parent=None),
run_by_consumer=False,
),
ExecutionPlan(DatasetStats(metadata={"FromPandas": metadata}, parent=None)),
logical_plan,
)

Expand Down Expand Up @@ -2581,10 +2569,7 @@ def from_numpy_refs(
logical_plan = LogicalPlan(FromNumpy(blocks, metadata))

return MaterializedDataset(
ExecutionPlan(
DatasetStats(metadata={"FromNumpy": metadata}, parent=None),
run_by_consumer=False,
),
ExecutionPlan(DatasetStats(metadata={"FromNumpy": metadata}, parent=None)),
logical_plan,
)

Expand Down Expand Up @@ -2660,10 +2645,7 @@ def from_arrow_refs(
logical_plan = LogicalPlan(FromArrow(tables, metadata))

return MaterializedDataset(
ExecutionPlan(
DatasetStats(metadata={"FromArrow": metadata}, parent=None),
run_by_consumer=False,
),
ExecutionPlan(DatasetStats(metadata={"FromArrow": metadata}, parent=None)),
logical_plan,
)

Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def _test_equal_split_balanced(block_sizes, num_splits):
logical_plan = LogicalPlan(InputData(input_data=ref_bundles))
stats = DatasetStats(metadata={"TODO": []}, parent=None)
ds = Dataset(
ExecutionPlan(stats, run_by_consumer=True),
ExecutionPlan(stats),
logical_plan,
)

Expand Down

0 comments on commit f2656ca

Please sign in to comment.