From f2656cacc90c60163b43c2bef51996dd5a830834 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 17 Jun 2024 16:04:19 -0700 Subject: [PATCH] [Data] Remove `run_by_consumer` parameter of `ExecutionPlan` (#46052) This PR removes dead functions and parameters after #46054. Signed-off-by: Balaji Veeramani --- .../data/_internal/execution/legacy_compat.py | 31 ++---------------- .../iterator/stream_split_iterator.py | 2 -- python/ray/data/_internal/plan.py | 28 ++-------------- python/ray/data/dataset.py | 22 +++---------- python/ray/data/iterator.py | 5 +-- python/ray/data/read_api.py | 32 ++++--------------- python/ray/data/tests/test_split.py | 2 +- 7 files changed, 19 insertions(+), 103 deletions(-) diff --git a/python/ray/data/_internal/execution/legacy_compat.py b/python/ray/data/_internal/execution/legacy_compat.py index 44fde949b3a6fd..b7d489efe65913 100644 --- a/python/ray/data/_internal/execution/legacy_compat.py +++ b/python/ray/data/_internal/execution/legacy_compat.py @@ -15,7 +15,7 @@ from ray.data._internal.logical.util import record_operators_usage from ray.data._internal.plan import ExecutionPlan from ray.data._internal.stats import DatasetStats -from ray.data.block import Block, BlockMetadata, List +from ray.data.block import Block, BlockMetadata from ray.types import ObjectRef # Warn about tasks larger than this. @@ -25,13 +25,9 @@ def execute_to_legacy_block_iterator( executor: Executor, plan: ExecutionPlan, - allow_clear_input_blocks: bool, - dataset_uuid: str, ) -> Iterator[Tuple[ObjectRef[Block], BlockMetadata]]: """Same as execute_to_legacy_bundle_iterator but returning blocks and metadata.""" - bundle_iter = execute_to_legacy_bundle_iterator( - executor, plan, allow_clear_input_blocks, dataset_uuid - ) + bundle_iter = execute_to_legacy_bundle_iterator(executor, plan) for bundle in bundle_iter: for block, metadata in bundle.blocks: yield block, metadata @@ -40,8 +36,6 @@ def execute_to_legacy_block_iterator( def execute_to_legacy_bundle_iterator( executor: Executor, plan: ExecutionPlan, - allow_clear_input_blocks: bool, - dataset_uuid: str, dag_rewrite=None, ) -> Iterator[RefBundle]: """Execute a plan with the new executor and return a bundle iterator. @@ -49,8 +43,6 @@ def execute_to_legacy_bundle_iterator( Args: executor: The executor to use. plan: The legacy plan to execute. - allow_clear_input_blocks: Whether the executor may consider clearing blocks. - dataset_uuid: UUID of the dataset for this execution. dag_rewrite: Callback that can be used to mutate the DAG prior to execution. This is currently used as a legacy hack to inject the OutputSplit operator for `Dataset.streaming_split()`. @@ -73,7 +65,6 @@ def execute_to_legacy_bundle_iterator( def execute_to_legacy_block_list( executor: Executor, plan: ExecutionPlan, - allow_clear_input_blocks: bool, dataset_uuid: str, preserve_order: bool, ) -> BlockList: @@ -82,7 +73,6 @@ def execute_to_legacy_block_list( Args: executor: The executor to use. plan: The legacy plan to execute. - allow_clear_input_blocks: Whether the executor may consider clearing blocks. dataset_uuid: UUID of the dataset for this execution. preserve_order: Whether to preserve order in execution. @@ -149,23 +139,6 @@ def _bundles_to_block_list(bundles: Iterator[RefBundle]) -> BlockList: return BlockList(blocks, metadata, owned_by_consumer=owns_blocks) -def _block_list_to_bundles(blocks: BlockList, owns_blocks: bool) -> List[RefBundle]: - output = [] - for block, meta in blocks.iter_blocks_with_metadata(): - output.append( - RefBundle( - [ - ( - block, - meta, - ) - ], - owns_blocks=owns_blocks, - ) - ) - return output - - def _set_stats_uuid_recursive(stats: DatasetStats, dataset_uuid: str) -> None: if not stats.dataset_uuid: stats.dataset_uuid = dataset_uuid diff --git a/python/ray/data/_internal/iterator/stream_split_iterator.py b/python/ray/data/_internal/iterator/stream_split_iterator.py index 205b5b3e12f457..18d0a4477f242e 100644 --- a/python/ray/data/_internal/iterator/stream_split_iterator.py +++ b/python/ray/data/_internal/iterator/stream_split_iterator.py @@ -175,8 +175,6 @@ def add_split_op(dag): output_iterator = execute_to_legacy_bundle_iterator( executor, dataset._plan, - True, - dataset._plan._dataset_uuid, dag_rewrite=add_split_op, ) yield output_iterator diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 460971940b1ea3..392cfbd592bc08 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -58,7 +58,6 @@ def __init__( self, stats: DatasetStats, *, - run_by_consumer: bool, data_context: Optional[DataContext] = None, ): """Create a plan with no transformation operators. @@ -66,8 +65,6 @@ def __init__( Args: stats: Stats for the base blocks. dataset_uuid: Dataset's UUID. - run_by_consumer: Whether this plan is invoked to run by the consumption - APIs (e.g. .iter_batches()). """ self._in_stats = stats # A computed snapshot of some prefix of operators and their corresponding @@ -81,7 +78,6 @@ def __init__( # Set when a Dataset is constructed with this plan self._dataset_uuid = None - self._run_by_consumer = run_by_consumer self._dataset_name = None self._has_started_execution = False @@ -97,7 +93,6 @@ def __repr__(self) -> str: return ( f"ExecutionPlan(" f"dataset_uuid={self._dataset_uuid}, " - f"run_by_consumer={self._run_by_consumer}, " f"snapshot_operator={self._snapshot_operator}" ) @@ -163,9 +158,7 @@ def generate_logical_plan_string( count = None else: assert len(sources) == 1 - plan = ExecutionPlan( - DatasetStats(metadata={}, parent=None), run_by_consumer=False - ) + plan = ExecutionPlan(DatasetStats(metadata={}, parent=None)) plan.link_logical_plan(LogicalPlan(sources[0])) schema = plan.schema() count = plan.meta_count() @@ -292,7 +285,6 @@ def copy(self) -> "ExecutionPlan": """ plan_copy = ExecutionPlan( self._in_stats, - run_by_consumer=self._run_by_consumer, data_context=self._context, ) if self._snapshot_bundle is not None: @@ -311,10 +303,7 @@ def deep_copy(self) -> "ExecutionPlan": Returns: A deep copy of this execution plan. """ - plan_copy = ExecutionPlan( - copy.copy(self._in_stats), - run_by_consumer=self._run_by_consumer, - ) + plan_copy = ExecutionPlan(copy.copy(self._in_stats)) if self._snapshot_bundle: # Copy over the existing snapshot. plan_copy._snapshot_bundle = copy.copy(self._snapshot_bundle) @@ -397,7 +386,6 @@ def meta_count(self) -> Optional[int]: @omit_traceback_stdout def execute_to_iterator( self, - allow_clear_input_blocks: bool = True, ) -> Tuple[ Iterator[Tuple[ObjectRef[Block], BlockMetadata]], DatasetStats, @@ -407,10 +395,6 @@ def execute_to_iterator( This will use streaming execution to generate outputs. - Args: - allow_clear_input_blocks: Whether we should try to clear the input blocks - for each operator. - Returns: Tuple of iterator over output blocks and the executor. """ @@ -420,7 +404,7 @@ def execute_to_iterator( ctx = self._context if self.has_computed_output(): - bundle = self.execute(allow_clear_input_blocks) + bundle = self.execute() return iter(bundle.blocks), self._snapshot_stats, None from ray.data._internal.execution.legacy_compat import ( @@ -433,8 +417,6 @@ def execute_to_iterator( block_iter = execute_to_legacy_block_iterator( executor, self, - allow_clear_input_blocks=allow_clear_input_blocks, - dataset_uuid=self._dataset_uuid, ) # Since the generator doesn't run any code until we try to fetch the first # value, force execution of one bundle before we call get_stats(). @@ -449,14 +431,11 @@ def execute_to_iterator( @omit_traceback_stdout def execute( self, - allow_clear_input_blocks: bool = True, preserve_order: bool = False, ) -> RefBundle: """Execute this plan. Args: - allow_clear_input_blocks: Whether we should try to clear the input blocks - for each operator. preserve_order: Whether to preserve order in execution. Returns: @@ -515,7 +494,6 @@ def execute( blocks = execute_to_legacy_block_list( executor, self, - allow_clear_input_blocks=allow_clear_input_blocks, dataset_uuid=self._dataset_uuid, preserve_order=preserve_order, ) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 58495b76f5d9e7..8ae387d65a599b 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1403,10 +1403,7 @@ def train(self, data_iterator): logical_plan = LogicalPlan(InputData(input_data=ref_bundles)) split_datasets.append( MaterializedDataset( - ExecutionPlan( - stats, - run_by_consumer=owned_by_consumer, - ), + ExecutionPlan(stats), logical_plan, ) ) @@ -1524,10 +1521,7 @@ def build_node_id_by_actor(actors: List[Any]) -> Dict[Any, str]: logical_plan = LogicalPlan(InputData(input_data=[bundle])) split_datasets.append( MaterializedDataset( - ExecutionPlan( - stats, - run_by_consumer=owned_by_consumer, - ), + ExecutionPlan(stats), logical_plan, ) ) @@ -1601,10 +1595,7 @@ def split_at_indices(self, indices: List[int]) -> List["MaterializedDataset"]: splits.append( MaterializedDataset( - ExecutionPlan( - stats, - run_by_consumer=bundle.owns_blocks, - ), + ExecutionPlan(stats), logical_plan, ) ) @@ -1793,7 +1784,7 @@ def union(self, *other: List["Dataset"]) -> "Dataset": ) stats.time_total_s = time.perf_counter() - start_time return Dataset( - ExecutionPlan(stats, run_by_consumer=False), + ExecutionPlan(stats), logical_plan, ) @@ -4526,10 +4517,7 @@ def materialize(self) -> "MaterializedDataset": ] logical_plan = LogicalPlan(InputData(input_data=ref_bundles)) output = MaterializedDataset( - ExecutionPlan( - copy._plan.stats(), - run_by_consumer=False, - ), + ExecutionPlan(copy._plan.stats()), logical_plan, ) # Metrics are tagged with `copy`s uuid, update the output uuid with diff --git a/python/ray/data/iterator.py b/python/ray/data/iterator.py index fb20286e945a08..2e7e24435e4cdc 100644 --- a/python/ray/data/iterator.py +++ b/python/ray/data/iterator.py @@ -875,10 +875,7 @@ def materialize(self) -> "MaterializedDataset": ] logical_plan = LogicalPlan(InputData(input_data=ref_bundles)) return MaterializedDataset( - ExecutionPlan( - stats, - run_by_consumer=owned_by_consumer, - ), + ExecutionPlan(stats), logical_plan, ) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index bf4d1df67d62b4..d3de42969fcf9a 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -124,10 +124,7 @@ def from_blocks(blocks: List[Block]): from_blocks_op = FromBlocks(block_refs, metadata) logical_plan = LogicalPlan(from_blocks_op) return MaterializedDataset( - ExecutionPlan( - DatasetStats(metadata={"FromBlocks": metadata}, parent=None), - run_by_consumer=False, - ), + ExecutionPlan(DatasetStats(metadata={"FromBlocks": metadata}, parent=None)), logical_plan, ) @@ -207,10 +204,7 @@ def from_items( from_items_op = FromItems(blocks, metadata) logical_plan = LogicalPlan(from_items_op) return MaterializedDataset( - ExecutionPlan( - DatasetStats(metadata={"FromItems": metadata}, parent=None), - run_by_consumer=False, - ), + ExecutionPlan(DatasetStats(metadata={"FromItems": metadata}, parent=None)), logical_plan, ) @@ -441,7 +435,7 @@ def read_datasource( ) logical_plan = LogicalPlan(read_op) return Dataset( - plan=ExecutionPlan(stats, run_by_consumer=False), + plan=ExecutionPlan(stats), logical_plan=logical_plan, ) @@ -2481,10 +2475,7 @@ def from_pandas_refs( metadata = ray.get([get_metadata.remote(df) for df in dfs]) logical_plan = LogicalPlan(FromPandas(dfs, metadata)) return MaterializedDataset( - ExecutionPlan( - DatasetStats(metadata={"FromPandas": metadata}, parent=None), - run_by_consumer=False, - ), + ExecutionPlan(DatasetStats(metadata={"FromPandas": metadata}, parent=None)), logical_plan, ) @@ -2495,10 +2486,7 @@ def from_pandas_refs( metadata = ray.get(metadata) logical_plan = LogicalPlan(FromPandas(blocks, metadata)) return MaterializedDataset( - ExecutionPlan( - DatasetStats(metadata={"FromPandas": metadata}, parent=None), - run_by_consumer=False, - ), + ExecutionPlan(DatasetStats(metadata={"FromPandas": metadata}, parent=None)), logical_plan, ) @@ -2581,10 +2569,7 @@ def from_numpy_refs( logical_plan = LogicalPlan(FromNumpy(blocks, metadata)) return MaterializedDataset( - ExecutionPlan( - DatasetStats(metadata={"FromNumpy": metadata}, parent=None), - run_by_consumer=False, - ), + ExecutionPlan(DatasetStats(metadata={"FromNumpy": metadata}, parent=None)), logical_plan, ) @@ -2660,10 +2645,7 @@ def from_arrow_refs( logical_plan = LogicalPlan(FromArrow(tables, metadata)) return MaterializedDataset( - ExecutionPlan( - DatasetStats(metadata={"FromArrow": metadata}, parent=None), - run_by_consumer=False, - ), + ExecutionPlan(DatasetStats(metadata={"FromArrow": metadata}, parent=None)), logical_plan, ) diff --git a/python/ray/data/tests/test_split.py b/python/ray/data/tests/test_split.py index fa773cda1bfc3f..2cd81cd2f6ae7a 100644 --- a/python/ray/data/tests/test_split.py +++ b/python/ray/data/tests/test_split.py @@ -101,7 +101,7 @@ def _test_equal_split_balanced(block_sizes, num_splits): logical_plan = LogicalPlan(InputData(input_data=ref_bundles)) stats = DatasetStats(metadata={"TODO": []}, parent=None) ds = Dataset( - ExecutionPlan(stats, run_by_consumer=True), + ExecutionPlan(stats), logical_plan, )