From 321a22b9f28707669ae1a437e8b44d66576541ad Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Tue, 31 Jan 2023 02:12:46 +0000 Subject: [PATCH 1/5] Add operator fusion to new execution planner. --- python/ray/data/_internal/compute.py | 11 + .../data/_internal/execution/interfaces.py | 9 +- .../data/_internal/execution/legacy_compat.py | 4 +- .../operators/all_to_all_operator.py | 5 + .../execution/operators/map_operator.py | 10 +- .../ray/data/_internal/logical/interfaces.py | 61 +++- .../ray/data/_internal/logical/optimizers.py | 234 +++++++++++-- python/ray/data/_internal/planner/planner.py | 38 ++- python/ray/data/tests/conftest.py | 10 +- .../data/tests/test_execution_optimizer.py | 310 +++++++++++++++++- 10 files changed, 633 insertions(+), 59 deletions(-) diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py index 997a1af92a1e..b5b3b733e00e 100644 --- a/python/ray/data/_internal/compute.py +++ b/python/ray/data/_internal/compute.py @@ -176,6 +176,9 @@ def _apply( owned_by_consumer=in_block_owned_by_consumer, ) + def __eq__(self, other: Any) -> bool: + return isinstance(other, TaskPoolStrategy) + @PublicAPI class ActorPoolStrategy(ComputeStrategy): @@ -449,6 +452,14 @@ def map_block_nosplit( finally: raise e from None + def __eq__(self, other: Any) -> bool: + return isinstance(other, ActorPoolStrategy) and ( + self.min_size == other.min_size + and self.max_size == other.max_size + and self.max_tasks_in_flight_per_actor + == other.max_tasks_in_flight_per_actor + ) + def get_compute(compute_spec: Union[str, ComputeStrategy]) -> ComputeStrategy: if not compute_spec or compute_spec == "tasks": diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 821662e257c1..f5ce3263bcc6 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Dict, List, Optional, Iterable, Tuple +from typing import Dict, List, Optional, Iterable, Tuple, Callable import ray from ray.data._internal.logical.interfaces import Operator @@ -237,6 +237,13 @@ def get_metrics(self) -> Dict[str, int]: """ return {} + def get_transformation_fn(self) -> Callable: + """Returns the underlying transformation function for this operator. + + This is used by the physical plan optimizer for e.g. operator fusion. + """ + raise NotImplementedError + def progress_str(self) -> str: """Return any extra status to be displayed in the operator progress bar. diff --git a/python/ray/data/_internal/execution/legacy_compat.py b/python/ray/data/_internal/execution/legacy_compat.py index 5bc7e3435035..c1cfaf8a4934 100644 --- a/python/ray/data/_internal/execution/legacy_compat.py +++ b/python/ray/data/_internal/execution/legacy_compat.py @@ -7,7 +7,7 @@ from typing import Iterator, Tuple, Any import ray -from ray.data._internal.logical.optimizers import get_execution_dag +from ray.data._internal.logical.optimizers import get_execution_plan from ray.data.context import DatasetContext from ray.types import ObjectRef from ray.data.block import Block, BlockMetadata, List @@ -78,7 +78,7 @@ def execute_to_legacy_block_list( The output as a legacy block list. """ if DatasetContext.get_current().optimizer_enabled: - dag, stats = get_execution_dag(plan._logical_plan.dag), None + dag, stats = get_execution_plan(plan._logical_plan).dag, None else: dag, stats = _to_operator_dag(plan, allow_clear_input_blocks) bundles = executor.execute(dag, initial_stats=stats) diff --git a/python/ray/data/_internal/execution/operators/all_to_all_operator.py b/python/ray/data/_internal/execution/operators/all_to_all_operator.py index fdc1d850e711..eac95e54a788 100644 --- a/python/ray/data/_internal/execution/operators/all_to_all_operator.py +++ b/python/ray/data/_internal/execution/operators/all_to_all_operator.py @@ -67,3 +67,8 @@ def get_next(self) -> RefBundle: def get_stats(self) -> StatsDict: return self._stats + + def get_transformation_fn( + self, + ) -> Callable[[List[RefBundle]], Tuple[List[RefBundle], StatsDict]]: + return self._bulk_fn diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 545b5ed5058c..23430764ca14 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -44,9 +44,7 @@ def __init__( # instead. # NOTE: This constructor must be called by subclasses. - # Put the function def in the object store to avoid repeated serialization - # in case it's large (i.e., closure captures large objects). - self._transform_fn_ref = ray.put(transform_fn) + self._transform_fn = transform_fn self._ray_remote_args = _canonicalize_ray_remote_args(ray_remote_args or {}) # Bundles block references up to the min_rows_per_bundle target. @@ -142,6 +140,9 @@ def start(self, options: "ExecutionOptions"): ray.get_runtime_context().get_node_id(), soft=True, ) + # Put the function def in the object store to avoid repeated serialization + # in case it's large (i.e., closure captures large objects). + self._transform_fn_ref = ray.put(self._transform_fn) super().start(options) def add_input(self, refs: RefBundle, input_index: int): @@ -261,6 +262,9 @@ def get_metrics(self) -> Dict[str, int]: def get_stats(self) -> StatsDict: return {self._name: self._output_metadata} + def get_transformation_fn(self) -> Callable[[Iterator[Block]], Iterator[Block]]: + return self._transform_fn + @abstractmethod def shutdown(self): # NOTE: This must be implemented by subclasses, and those overriding methods diff --git a/python/ray/data/_internal/logical/interfaces.py b/python/ray/data/_internal/logical/interfaces.py index 18304828e447..4e88d0b67e2e 100644 --- a/python/ray/data/_internal/logical/interfaces.py +++ b/python/ray/data/_internal/logical/interfaces.py @@ -1,4 +1,7 @@ -from typing import List +from typing import List, Dict, TYPE_CHECKING + +if TYPE_CHECKING: + from ray.data._internal.execution.interfaces import PhysicalOperator class Operator: @@ -51,11 +54,57 @@ def __init__(self, name: str, input_dependencies: List["LogicalOperator"]): assert isinstance(x, LogicalOperator), x +class Plan: + """Abstract class for logical/physical execution plans. + + This plan should hold an operator representing the plan DAG and any auxiliary data + that's useful for plan optimization or execution. + """ + + @property + def dag(self) -> Operator: + raise NotImplementedError + + +class LogicalPlan(Plan): + """The plan with a DAG of logical operators.""" + + def __init__(self, dag: LogicalOperator): + self._dag = dag + + @property + def dag(self) -> LogicalOperator: + """Get the DAG of logical operators.""" + return self._dag + + +class PhysicalPlan(Plan): + """The plan with a DAG of physical operators.""" + + def __init__( + self, dag: "PhysicalOperator", op_map: Dict["PhysicalOperator", LogicalOperator] + ): + self._dag = dag + self._op_map = op_map + + @property + def dag(self) -> "PhysicalOperator": + """Get the DAG of physical operators.""" + return self._dag + + @property + def op_map(self) -> Dict["PhysicalOperator", LogicalOperator]: + """ + Get a mapping from physical operators to their corresponding logical operator. + """ + return self._op_map + + class Rule: """Abstract class for optimization rule.""" - def apply(dag: Operator) -> Operator: - """Apply the optimization rule to the DAG of operators.""" + def apply(plan: Plan) -> Plan: + """Apply the optimization rule to the execution plan.""" raise NotImplementedError @@ -70,8 +119,8 @@ def rules(self) -> List[Rule]: """List of predefined rules for this optimizer.""" raise NotImplementedError - def optimize(self, dag: Operator) -> Operator: + def optimize(self, plan: Plan) -> Plan: """Optimize operators with a list of rules.""" for rule in self.rules: - dag = rule.apply(dag) - return dag + plan = rule.apply(plan) + return plan diff --git a/python/ray/data/_internal/logical/optimizers.py b/python/ray/data/_internal/logical/optimizers.py index eaced07affc0..f56f1b39e86f 100644 --- a/python/ray/data/_internal/logical/optimizers.py +++ b/python/ray/data/_internal/logical/optimizers.py @@ -1,10 +1,21 @@ -from typing import List +from typing import List, Iterator +from ray.data.block import Block +from ray.data._internal.compute import is_task_compute, CallableClass, get_compute from ray.data._internal.execution.interfaces import PhysicalOperator -from ray.data._internal.logical.interfaces import Rule, Optimizer, LogicalOperator +from ray.data._internal.logical.interfaces import ( + Rule, + Optimizer, + LogicalPlan, + PhysicalPlan, +) from ray.data._internal.planner.planner import Planner +# Scheduling strategy can be inherited from upstream operator if not specified. +INHERITABLE_REMOTE_ARGS = ["scheduling_strategy"] + + class LogicalOptimizer(Optimizer): """The optimizer for logical operators.""" @@ -19,30 +30,211 @@ class PhysicalOptimizer(Optimizer): @property def rules(self) -> List["Rule"]: - # TODO: Add physical optimizer rules. - return [] + return [OperatorFusionRule()] -class LogicalPlan: - """The plan with a DAG of logical operators.""" - - def __init__(self, dag: LogicalOperator): - self._dag = dag - - @property - def dag(self) -> LogicalOperator: - """Get the DAG of logical operators.""" - return self._dag - - -def get_execution_dag(logical_dag: LogicalOperator) -> PhysicalOperator: - """Get the DAG of physical operators to execute. +def get_execution_plan(logical_plan: LogicalPlan) -> PhysicalPlan: + """Get the physical execution plan for the provided logical plan. This process has 3 steps: (1) logical optimization: optimize logical operators. (2) planning: convert logical to physical operators. (3) physical optimization: optimize physical operators. """ - optimized_logical_dag = LogicalOptimizer().optimize(logical_dag) - physical_dag = Planner().plan(optimized_logical_dag) - return PhysicalOptimizer().optimize(physical_dag) + logical_plan = LogicalOptimizer().optimize(logical_plan) + physical_plan = Planner().plan(logical_plan) + return PhysicalOptimizer().optimize(physical_plan) + + +class OperatorFusionRule(Rule): + """Fuses linear chains of compatible physical operators.""" + + def apply(self, plan: PhysicalPlan) -> PhysicalPlan: + self._op_map = plan.op_map.copy() + # Do DFS fusion. + root = self._apply(plan.dag) + return PhysicalPlan(root, self._op_map) + + def _apply(self, op: PhysicalOperator) -> PhysicalOperator: + """Performs DFS fusion of linear chains of physical map operators, provided that + they are pairwise-compatible. + + Args: + op: The op that we're trying to fuse with its input. + """ + upstream_ops = op.input_dependencies + # Fuse with upstream ops while possible. + while len(upstream_ops) == 1 and self._can_fuse(op, upstream_ops[0]): + # Fuse operator with its upstream op. + op = self._fuse(op, upstream_ops[0]) + upstream_ops = op.input_dependencies + # Can no longer fuse with upstream ops, proceed up the DAG. + op._input_dependencies = [ + self._apply(upstream_op) for upstream_op in upstream_ops + ] + return op + + def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool: + """Returns whether the provided downstream operator can be fused with the given + upstream operator. + + We currently support fusing two operators if the following are all true: + * They are both MapOperators. + * They either use the same compute configuration, or the upstream operator + uses a task pool while the downstream operator uses an actor pool. + * If both operators involve callable classes, the callable classes are + the same class AND constructor args are the same for both. + * They have compatible remote arguments. + """ + from ray.data._internal.execution.operators.map_operator import MapOperator + from ray.data._internal.logical.operators.map_operator import AbstractMap + + # We only support fusing MapOperators. + if not isinstance(down_op, MapOperator) or not isinstance(up_op, MapOperator): + return False + + down_logical_op = self._op_map[down_op] + up_logical_op = self._op_map[up_op] + + # Allow fusing tasks->actors if the resources are compatible (read->map), but + # not the other way around. The latter (downstream op) will be used as the + # compute if fused. + if ( + is_task_compute(down_logical_op._compute) + and isinstance(up_logical_op, AbstractMap) + and get_compute(up_logical_op._compute) + != get_compute(down_logical_op._compute) + ): + return False + + # Fusing callable classes is only supported if they are the same function AND + # their construction arguments are the same. + # TODO(Clark): Support multiple callable classes instantiating in the same actor + # worker. + if ( + isinstance(down_logical_op._fn, CallableClass) + and isinstance(up_logical_op, AbstractMap) + and isinstance(up_logical_op._fn, CallableClass) + and ( + up_logical_op._fn != down_logical_op._fn + or ( + up_logical_op._fn_constructor_args + != down_logical_op._fn_constructor_args + or up_logical_op._fn_constructor_kwargs + != down_logical_op._fn_constructor_kwargs + ) + ) + ): + return False + + # Only fuse if the ops' remote arguments are compatible. + if not _are_remote_args_compatible( + up_logical_op._ray_remote_args or {}, down_logical_op._ray_remote_args or {} + ): + return False + + # Otherwise, ops are compatible for fusion. + return True + + def _fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator): + """Fuse the downstream operator with its upstream operator.""" + from ray.data._internal.execution.operators.map_operator import MapOperator + from ray.data._internal.logical.operators.map_operator import AbstractMap + + # Fuse operator names. + name = up_op.name + "->" + down_op.name + + down_logical_op = self._op_map.pop(down_op) + up_logical_op = self._op_map.pop(up_op) + + # Merge target block sizes. + down_target_block_size = down_logical_op._target_block_size + up_target_block_size = ( + up_logical_op._target_block_size + if isinstance(up_logical_op, AbstractMap) + else None + ) + if down_target_block_size is not None and up_target_block_size is not None: + target_block_size = max(down_target_block_size, up_target_block_size) + elif up_target_block_size is not None: + target_block_size = up_target_block_size + else: + target_block_size = down_target_block_size + + # Fuse transformation functions. + down_transform_fn = down_op.get_transformation_fn() + up_transform_fn = up_op.get_transformation_fn() + + def transform_fn(blocks: Iterator[Block]) -> Iterator[Block]: + blocks = up_transform_fn(blocks) + # TODO(Clark): Add zero-copy batching between transform functions. + return down_transform_fn(blocks) + + # We take the downstream op's compute in case we're fusing upstream tasks with a + # downstream actor pool (e.g. read->map). + compute = get_compute(down_logical_op._compute) + ray_remote_args = down_logical_op._ray_remote_args + # Make the upstream operator's inputs the new, fused operator's inputs. + input_deps = up_op.input_dependencies + assert len(input_deps) == 1 + input_op = input_deps[0] + + # Fused physical map operator. + op = MapOperator.create( + transform_fn, + input_op, + name=name, + compute_strategy=compute, + min_rows_per_bundle=target_block_size, + ray_remote_args=ray_remote_args, + ) + + # Build a map logical operator to be used as a reference for further fusion. + # TODO(Clark): This is hacky, remove this once we push fusion to be purely based + # on a lower-level operator spec. + if isinstance(up_logical_op, AbstractMap): + input_op = up_logical_op.input_dependencies[0] + fn = up_logical_op._fn + fn_args = up_logical_op._fn_args + fn_kwargs = up_logical_op._fn_kwargs + fn_constructor_args = up_logical_op._fn_constructor_args + fn_constructor_kwargs = up_logical_op._fn_constructor_kwargs + else: + input_op = up_logical_op + fn = down_logical_op._fn + fn_args = down_logical_op._fn_args + fn_kwargs = down_logical_op._fn_kwargs + fn_constructor_args = down_logical_op._fn_constructor_args + fn_constructor_kwargs = down_logical_op._fn_constructor_kwargs + logical_op = AbstractMap( + name, + input_op, + fn, + fn_args, + fn_kwargs, + fn_constructor_args, + fn_constructor_kwargs, + target_block_size, + compute, + ray_remote_args, + ) + self._op_map[op] = logical_op + # Return the fused physical operator. + return op + + +def _are_remote_args_compatible(up_args, down_args): + """Check if Ray remote arguments are compatible for merging.""" + from ray.data._internal.execution.operators.map_operator import ( + _canonicalize_ray_remote_args, + ) + + up_args = _canonicalize_ray_remote_args(up_args) + down_args = _canonicalize_ray_remote_args(down_args) + remote_args = down_args.copy() + for key in INHERITABLE_REMOTE_ARGS: + if key in up_args: + remote_args[key] = up_args[key] + if up_args != remote_args: + return False + return True diff --git a/python/ray/data/_internal/planner/planner.py b/python/ray/data/_internal/planner/planner.py index 3f5084755944..8bac276fae46 100644 --- a/python/ray/data/_internal/planner/planner.py +++ b/python/ray/data/_internal/planner/planner.py @@ -1,5 +1,11 @@ +from typing import Dict + from ray.data._internal.execution.interfaces import PhysicalOperator -from ray.data._internal.logical.interfaces import LogicalOperator +from ray.data._internal.logical.interfaces import ( + LogicalOperator, + LogicalPlan, + PhysicalPlan, +) from ray.data._internal.logical.operators.all_to_all_operator import AbstractAllToAll from ray.data._internal.logical.operators.read_operator import Read from ray.data._internal.logical.operators.map_operator import AbstractMap @@ -15,24 +21,32 @@ class Planner: done by physical optimizer. """ - def plan(self, logical_dag: LogicalOperator) -> PhysicalOperator: + def __init__(self): + self._physical_op_to_logical_op: Dict[PhysicalOperator, LogicalOperator] = {} + + def plan(self, logical_plan: LogicalPlan) -> PhysicalPlan: """Convert logical to physical operators recursively in post-order.""" + physical_dag = self._plan(logical_plan.dag) + return PhysicalPlan(physical_dag, self._physical_op_to_logical_op) + + def _plan(self, logical_op: LogicalOperator) -> PhysicalOperator: # Plan the input dependencies first. physical_children = [] - for child in logical_dag.input_dependencies: - physical_children.append(self.plan(child)) + for child in logical_op.input_dependencies: + physical_children.append(self._plan(child)) - if isinstance(logical_dag, Read): + if isinstance(logical_op, Read): assert not physical_children - physical_dag = _plan_read_op(logical_dag) - elif isinstance(logical_dag, AbstractMap): + physical_op = _plan_read_op(logical_op) + elif isinstance(logical_op, AbstractMap): assert len(physical_children) == 1 - physical_dag = _plan_map_op(logical_dag, physical_children[0]) - elif isinstance(logical_dag, AbstractAllToAll): + physical_op = _plan_map_op(logical_op, physical_children[0]) + elif isinstance(logical_op, AbstractAllToAll): assert len(physical_children) == 1 - physical_dag = _plan_all_to_all_op(logical_dag, physical_children[0]) + physical_op = _plan_all_to_all_op(logical_op, physical_children[0]) else: raise ValueError( - f"Found unknown logical operator during planning: {logical_dag}" + f"Found unknown logical operator during planning: {logical_op}" ) - return physical_dag + self._physical_op_to_logical_op[physical_op] = logical_op + return physical_op diff --git a/python/ray/data/tests/conftest.py b/python/ray/data/tests/conftest.py index de089750c052..0d1cb84deb92 100644 --- a/python/ray/data/tests/conftest.py +++ b/python/ray/data/tests/conftest.py @@ -326,14 +326,14 @@ def target_max_block_size(request): ctx.target_max_block_size = original -@pytest.fixture(params=[True]) -def enable_optimizer(request): +@pytest.fixture +def enable_optimizer(): ctx = ray.data.context.DatasetContext.get_current() original_backend = ctx.new_execution_backend original_optimizer = ctx.optimizer_enabled - ctx.new_execution_backend = request.param - ctx.optimizer_enabled = request.param - yield request.param + ctx.new_execution_backend = True + ctx.optimizer_enabled = True + yield ctx.new_execution_backend = original_backend ctx.optimizer_enabled = original_optimizer diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py index b4c73a1ae122..d313754e7f66 100644 --- a/python/ray/data/tests/test_execution_optimizer.py +++ b/python/ray/data/tests/test_execution_optimizer.py @@ -4,6 +4,8 @@ from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer +from ray.data._internal.logical.interfaces import LogicalPlan +from ray.data._internal.logical.optimizers import PhysicalOptimizer from ray.data._internal.logical.operators.all_to_all_operator import ( RandomShuffle, RandomizeBlocks, @@ -25,7 +27,8 @@ def test_read_operator(ray_start_cluster_enabled, enable_optimizer): planner = Planner() op = Read(ParquetDatasource()) - physical_op = planner.plan(op) + plan = LogicalPlan(op) + physical_op = planner.plan(plan).dag assert op.name == "Read" assert isinstance(physical_op, MapOperator) @@ -38,10 +41,10 @@ def test_map_batches_operator(ray_start_cluster_enabled, enable_optimizer): read_op = Read(ParquetDatasource()) op = MapBatches( read_op, - lambda it: (x for x in it), lambda x: x, ) - physical_op = planner.plan(op) + plan = LogicalPlan(op) + physical_op = planner.plan(plan).dag assert op.name == "MapBatches" assert isinstance(physical_op, MapOperator) @@ -62,7 +65,8 @@ def test_map_rows_operator(ray_start_cluster_enabled, enable_optimizer): read_op, lambda x: x, ) - physical_op = planner.plan(op) + plan = LogicalPlan(op) + physical_op = planner.plan(plan).dag assert op.name == "MapRows" assert isinstance(physical_op, MapOperator) @@ -83,7 +87,8 @@ def test_filter_operator(ray_start_cluster_enabled, enable_optimizer): read_op, lambda x: x, ) - physical_op = planner.plan(op) + plan = LogicalPlan(op) + physical_op = planner.plan(plan).dag assert op.name == "Filter" assert isinstance(physical_op, MapOperator) @@ -104,7 +109,8 @@ def test_flat_map(ray_start_cluster_enabled, enable_optimizer): read_op, lambda x: x, ) - physical_op = planner.plan(op) + plan = LogicalPlan(op) + physical_op = planner.plan(plan).dag assert op.name == "FlatMap" assert isinstance(physical_op, MapOperator) @@ -156,7 +162,8 @@ def test_randomize_blocks_operator(ray_start_cluster_enabled, enable_optimizer): read_op, seed=0, ) - physical_op = planner.plan(op) + plan = LogicalPlan(op) + physical_op = planner.plan(plan).dag assert op.name == "RandomizeBlocks" assert isinstance(physical_op, AllToAllOperator) @@ -177,7 +184,8 @@ def test_random_shuffle_operator(ray_start_cluster_enabled, enable_optimizer): read_op, seed=0, ) - physical_op = planner.plan(op) + plan = LogicalPlan(op) + physical_op = planner.plan(plan).dag assert op.name == "RandomShuffle" assert isinstance(physical_op, AllToAllOperator) @@ -204,7 +212,8 @@ def test_repartition_operator(ray_start_cluster_enabled, enable_optimizer): num_outputs=5, shuffle=True, ) - physical_op = planner.plan(op) + plan = LogicalPlan(op) + physical_op = planner.plan(plan).dag assert op.name == "Repartition" assert isinstance(physical_op, AllToAllOperator) @@ -234,6 +243,289 @@ def test_repartition_e2e( ds.repartition(20, shuffle=False).take_all() +def test_read_map_batches_operator_fusion(ray_start_cluster_enabled, enable_optimizer): + # Test that Read is fused with MapBatches. + planner = Planner() + read_op = Read(ParquetDatasource()) + op = MapBatches( + read_op, + lambda x: x, + ) + logical_plan = LogicalPlan(op) + physical_plan = planner.plan(logical_plan) + physical_plan = PhysicalOptimizer().optimize(physical_plan) + physical_op = physical_plan.dag + + assert op.name == "MapBatches" + assert physical_op.name == "DoRead->MapBatches" + assert isinstance(physical_op, MapOperator) + assert len(physical_op.input_dependencies) == 1 + assert isinstance(physical_op.input_dependencies[0], InputDataBuffer) + + +def test_read_map_chain_operator_fusion(ray_start_cluster_enabled, enable_optimizer): + # Test that a chain of different map operators are fused. + planner = Planner() + read_op = Read(ParquetDatasource()) + op = MapRows(read_op, lambda x: x) + op = MapBatches(op, lambda x: x) + op = FlatMap(op, lambda x: x) + op = Filter(op, lambda x: x) + logical_plan = LogicalPlan(op) + physical_plan = planner.plan(logical_plan) + physical_plan = PhysicalOptimizer().optimize(physical_plan) + physical_op = physical_plan.dag + + assert op.name == "Filter" + assert physical_op.name == "DoRead->MapRows->MapBatches->FlatMap->Filter" + assert isinstance(physical_op, MapOperator) + assert len(physical_op.input_dependencies) == 1 + assert isinstance(physical_op.input_dependencies[0], InputDataBuffer) + + +def test_read_map_chain_operator_fusion_e2e( + ray_start_cluster_enabled, enable_optimizer +): + ds = ray.data.range(10, parallelism=2) + ds = ds.filter(lambda x: x % 2 == 0) + ds = ds.map(lambda x: x + 1) + ds = ds.map_batches(lambda batch: [2 * x for x in batch], batch_size=None) + ds = ds.flat_map(lambda x: [-x, x]) + assert ds.take_all() == [-2, 2, -6, 6, -10, 10, -14, 14, -18, 18] + name = "DoRead->Filter->MapRows->MapBatches->FlatMap:" + assert name in ds.stats() + + +def test_read_map_batches_operator_fusion_compatible_remote_args( + ray_start_cluster_enabled, enable_optimizer +): + # Test that map operators are stilled fused when remote args are compatible. + planner = Planner() + read_op = Read( + ParquetDatasource(), + ray_remote_args={"num_cpus": 1, "scheduling_strategy": "SPREAD"}, + ) + op = MapBatches(read_op, lambda x: x, ray_remote_args={"num_cpus": 1}) + op = MapBatches(op, lambda x: x, ray_remote_args={"num_cpus": 1}) + logical_plan = LogicalPlan(op) + physical_plan = planner.plan(logical_plan) + physical_plan = PhysicalOptimizer().optimize(physical_plan) + physical_op = physical_plan.dag + + assert op.name == "MapBatches" + assert physical_op.name == "DoRead->MapBatches->MapBatches" + assert isinstance(physical_op, MapOperator) + assert len(physical_op.input_dependencies) == 1 + assert isinstance(physical_op.input_dependencies[0], InputDataBuffer) + + +def test_read_map_batches_operator_fusion_incompatible_remote_args( + ray_start_cluster_enabled, enable_optimizer +): + # Test that map operators are not fused when remote args are incompatible. + planner = Planner() + read_op = Read(ParquetDatasource()) + op = MapBatches(read_op, lambda x: x, ray_remote_args={"num_cpus": 2}) + op = MapBatches(op, lambda x: x, ray_remote_args={"num_cpus": 3}) + logical_plan = LogicalPlan(op) + physical_plan = planner.plan(logical_plan) + physical_plan = PhysicalOptimizer().optimize(physical_plan) + physical_op = physical_plan.dag + + assert op.name == "MapBatches" + assert physical_op.name == "MapBatches" + assert isinstance(physical_op, MapOperator) + assert len(physical_op.input_dependencies) == 1 + upstream_physical_op = physical_op.input_dependencies[0] + assert isinstance(upstream_physical_op, MapOperator) + # Read shouldn't fuse into first MapBatches either, due to the differing CPU + # request. + assert upstream_physical_op.name == "MapBatches" + + +def test_read_map_batches_operator_fusion_compute_tasks_to_actors( + ray_start_cluster_enabled, enable_optimizer +): + # Test that a task-based map operator is fused into an actor-based map operator when + # the former comes before the latter. + planner = Planner() + read_op = Read(ParquetDatasource()) + op = MapBatches(read_op, lambda x: x, compute="tasks") + op = MapBatches(op, lambda x: x, compute="actors") + logical_plan = LogicalPlan(op) + physical_plan = planner.plan(logical_plan) + physical_plan = PhysicalOptimizer().optimize(physical_plan) + physical_op = physical_plan.dag + + assert op.name == "MapBatches" + assert physical_op.name == "DoRead->MapBatches->MapBatches" + assert isinstance(physical_op, MapOperator) + assert len(physical_op.input_dependencies) == 1 + assert isinstance(physical_op.input_dependencies[0], InputDataBuffer) + + +def test_read_map_batches_operator_fusion_compute_read_to_actors( + ray_start_cluster_enabled, enable_optimizer +): + # Test that reads fuse into an actor-based map operator. + planner = Planner() + read_op = Read(ParquetDatasource()) + op = MapBatches(read_op, lambda x: x, compute="actors") + logical_plan = LogicalPlan(op) + physical_plan = planner.plan(logical_plan) + physical_plan = PhysicalOptimizer().optimize(physical_plan) + physical_op = physical_plan.dag + + assert op.name == "MapBatches" + assert physical_op.name == "DoRead->MapBatches" + assert isinstance(physical_op, MapOperator) + assert len(physical_op.input_dependencies) == 1 + assert isinstance(physical_op.input_dependencies[0], InputDataBuffer) + + +def test_read_map_batches_operator_fusion_incompatible_compute( + ray_start_cluster_enabled, enable_optimizer +): + # Test that map operators are not fused when compute strategies are incompatible. + planner = Planner() + read_op = Read(ParquetDatasource()) + op = MapBatches(read_op, lambda x: x, compute="actors") + op = MapBatches(op, lambda x: x, compute="tasks") + logical_plan = LogicalPlan(op) + physical_plan = planner.plan(logical_plan) + physical_plan = PhysicalOptimizer().optimize(physical_plan) + physical_op = physical_plan.dag + + assert op.name == "MapBatches" + assert physical_op.name == "MapBatches" + assert isinstance(physical_op, MapOperator) + assert len(physical_op.input_dependencies) == 1 + upstream_physical_op = physical_op.input_dependencies[0] + assert isinstance(upstream_physical_op, MapOperator) + # Reads should fuse into actor compute. + assert upstream_physical_op.name == "DoRead->MapBatches" + + +def test_read_map_batches_operator_fusion_target_block_size( + ray_start_cluster_enabled, enable_optimizer +): + # Test that fusion of map operators merges their block sizes in the expected way + # (taking the max). + planner = Planner() + read_op = Read(ParquetDatasource()) + op = MapBatches(read_op, lambda x: x, target_block_size=2) + op = MapBatches(op, lambda x: x, target_block_size=5) + op = MapBatches(op, lambda x: x, target_block_size=3) + logical_plan = LogicalPlan(op) + physical_plan = planner.plan(logical_plan) + physical_plan = PhysicalOptimizer().optimize(physical_plan) + physical_op = physical_plan.dag + + assert op.name == "MapBatches" + # Ops are still fused. + assert physical_op.name == "DoRead->MapBatches->MapBatches->MapBatches" + assert isinstance(physical_op, MapOperator) + # Target block size is set to max. + assert physical_op._block_ref_bundler._min_rows_per_bundle == 5 + assert len(physical_op.input_dependencies) == 1 + assert isinstance(physical_op.input_dependencies[0], InputDataBuffer) + + +def test_read_map_batches_operator_fusion_callable_classes( + ray_start_cluster_enabled, enable_optimizer +): + # Test that callable classes can still be fused if they're the same function. + planner = Planner() + read_op = Read(ParquetDatasource()) + + class UDF: + def __call__(self, x): + return x + + op = MapBatches(read_op, UDF, compute="actors") + op = MapBatches(op, UDF, compute="actors") + logical_plan = LogicalPlan(op) + physical_plan = planner.plan(logical_plan) + physical_plan = PhysicalOptimizer().optimize(physical_plan) + physical_op = physical_plan.dag + + assert op.name == "MapBatches" + assert physical_op.name == "DoRead->MapBatches->MapBatches" + assert isinstance(physical_op, MapOperator) + assert len(physical_op.input_dependencies) == 1 + assert isinstance(physical_op.input_dependencies[0], InputDataBuffer) + + +def test_read_map_batches_operator_fusion_incompatible_callable_classes( + ray_start_cluster_enabled, enable_optimizer +): + # Test that map operators are not fused when different callable classes are used. + planner = Planner() + read_op = Read(ParquetDatasource()) + + class UDF: + def __call__(self, x): + return x + + class UDF2: + def __call__(self, x): + return x + 1 + + op = MapBatches(read_op, UDF, compute="actors") + op = MapBatches(op, UDF2, compute="actors") + logical_plan = LogicalPlan(op) + physical_plan = planner.plan(logical_plan) + physical_plan = PhysicalOptimizer().optimize(physical_plan) + physical_op = physical_plan.dag + + assert op.name == "MapBatches" + assert physical_op.name == "MapBatches" + assert isinstance(physical_op, MapOperator) + assert len(physical_op.input_dependencies) == 1 + upstream_physical_op = physical_op.input_dependencies[0] + assert isinstance(upstream_physical_op, MapOperator) + # Reads should still fuse with first map. + assert upstream_physical_op.name == "DoRead->MapBatches" + + +def test_read_map_batches_operator_fusion_incompatible_constructor_args( + ray_start_cluster_enabled, enable_optimizer +): + # Test that map operators are not fused when callable classes have different + # constructor args. + planner = Planner() + read_op = Read(ParquetDatasource()) + + class UDF: + def __init__(self, a): + self._a + + def __call__(self, x): + return x + self._a + + op = MapBatches(read_op, UDF, compute="actors", fn_constructor_args=(1,)) + op = MapBatches(op, UDF, compute="actors", fn_constructor_args=(2,)) + op = MapBatches(op, UDF, compute="actors", fn_constructor_kwargs={"a": 1}) + op = MapBatches(op, UDF, compute="actors", fn_constructor_kwargs={"a": 2}) + logical_plan = LogicalPlan(op) + physical_plan = planner.plan(logical_plan) + physical_plan = PhysicalOptimizer().optimize(physical_plan) + physical_op = physical_plan.dag + + assert op.name == "MapBatches" + # Last 3 physical map operators are unfused. + for _ in range(3): + assert isinstance(physical_op, MapOperator) + assert physical_op.name == "MapBatches" + assert len(physical_op.input_dependencies) == 1 + physical_op = physical_op.input_dependencies[0] + # First physical map operator is fused with read. + assert isinstance(physical_op, MapOperator) + assert physical_op.name == "DoRead->MapBatches" + assert len(physical_op.input_dependencies) == 1 + assert isinstance(physical_op.input_dependencies[0], InputDataBuffer) + + if __name__ == "__main__": import sys From f54bde2853b7f1a9f5382a55b06285f04e4a8940 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Tue, 31 Jan 2023 16:09:33 +0000 Subject: [PATCH 2/5] PR feedback. --- .../ray/data/_internal/logical/optimizers.py | 204 +---------------- .../data/_internal/logical/rules/__init__.py | 3 + .../logical/rules/operator_fusion.py | 206 ++++++++++++++++++ .../data/tests/test_execution_optimizer.py | 26 +-- 4 files changed, 224 insertions(+), 215 deletions(-) create mode 100644 python/ray/data/_internal/logical/rules/__init__.py create mode 100644 python/ray/data/_internal/logical/rules/operator_fusion.py diff --git a/python/ray/data/_internal/logical/optimizers.py b/python/ray/data/_internal/logical/optimizers.py index f56f1b39e86f..df14f99ba5c0 100644 --- a/python/ray/data/_internal/logical/optimizers.py +++ b/python/ray/data/_internal/logical/optimizers.py @@ -1,21 +1,15 @@ -from typing import List, Iterator +from typing import List -from ray.data.block import Block -from ray.data._internal.compute import is_task_compute, CallableClass, get_compute -from ray.data._internal.execution.interfaces import PhysicalOperator from ray.data._internal.logical.interfaces import ( Rule, Optimizer, LogicalPlan, PhysicalPlan, ) +from ray.data._internal.logical.rules import OperatorFusionRule from ray.data._internal.planner.planner import Planner -# Scheduling strategy can be inherited from upstream operator if not specified. -INHERITABLE_REMOTE_ARGS = ["scheduling_strategy"] - - class LogicalOptimizer(Optimizer): """The optimizer for logical operators.""" @@ -44,197 +38,3 @@ def get_execution_plan(logical_plan: LogicalPlan) -> PhysicalPlan: logical_plan = LogicalOptimizer().optimize(logical_plan) physical_plan = Planner().plan(logical_plan) return PhysicalOptimizer().optimize(physical_plan) - - -class OperatorFusionRule(Rule): - """Fuses linear chains of compatible physical operators.""" - - def apply(self, plan: PhysicalPlan) -> PhysicalPlan: - self._op_map = plan.op_map.copy() - # Do DFS fusion. - root = self._apply(plan.dag) - return PhysicalPlan(root, self._op_map) - - def _apply(self, op: PhysicalOperator) -> PhysicalOperator: - """Performs DFS fusion of linear chains of physical map operators, provided that - they are pairwise-compatible. - - Args: - op: The op that we're trying to fuse with its input. - """ - upstream_ops = op.input_dependencies - # Fuse with upstream ops while possible. - while len(upstream_ops) == 1 and self._can_fuse(op, upstream_ops[0]): - # Fuse operator with its upstream op. - op = self._fuse(op, upstream_ops[0]) - upstream_ops = op.input_dependencies - # Can no longer fuse with upstream ops, proceed up the DAG. - op._input_dependencies = [ - self._apply(upstream_op) for upstream_op in upstream_ops - ] - return op - - def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool: - """Returns whether the provided downstream operator can be fused with the given - upstream operator. - - We currently support fusing two operators if the following are all true: - * They are both MapOperators. - * They either use the same compute configuration, or the upstream operator - uses a task pool while the downstream operator uses an actor pool. - * If both operators involve callable classes, the callable classes are - the same class AND constructor args are the same for both. - * They have compatible remote arguments. - """ - from ray.data._internal.execution.operators.map_operator import MapOperator - from ray.data._internal.logical.operators.map_operator import AbstractMap - - # We only support fusing MapOperators. - if not isinstance(down_op, MapOperator) or not isinstance(up_op, MapOperator): - return False - - down_logical_op = self._op_map[down_op] - up_logical_op = self._op_map[up_op] - - # Allow fusing tasks->actors if the resources are compatible (read->map), but - # not the other way around. The latter (downstream op) will be used as the - # compute if fused. - if ( - is_task_compute(down_logical_op._compute) - and isinstance(up_logical_op, AbstractMap) - and get_compute(up_logical_op._compute) - != get_compute(down_logical_op._compute) - ): - return False - - # Fusing callable classes is only supported if they are the same function AND - # their construction arguments are the same. - # TODO(Clark): Support multiple callable classes instantiating in the same actor - # worker. - if ( - isinstance(down_logical_op._fn, CallableClass) - and isinstance(up_logical_op, AbstractMap) - and isinstance(up_logical_op._fn, CallableClass) - and ( - up_logical_op._fn != down_logical_op._fn - or ( - up_logical_op._fn_constructor_args - != down_logical_op._fn_constructor_args - or up_logical_op._fn_constructor_kwargs - != down_logical_op._fn_constructor_kwargs - ) - ) - ): - return False - - # Only fuse if the ops' remote arguments are compatible. - if not _are_remote_args_compatible( - up_logical_op._ray_remote_args or {}, down_logical_op._ray_remote_args or {} - ): - return False - - # Otherwise, ops are compatible for fusion. - return True - - def _fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator): - """Fuse the downstream operator with its upstream operator.""" - from ray.data._internal.execution.operators.map_operator import MapOperator - from ray.data._internal.logical.operators.map_operator import AbstractMap - - # Fuse operator names. - name = up_op.name + "->" + down_op.name - - down_logical_op = self._op_map.pop(down_op) - up_logical_op = self._op_map.pop(up_op) - - # Merge target block sizes. - down_target_block_size = down_logical_op._target_block_size - up_target_block_size = ( - up_logical_op._target_block_size - if isinstance(up_logical_op, AbstractMap) - else None - ) - if down_target_block_size is not None and up_target_block_size is not None: - target_block_size = max(down_target_block_size, up_target_block_size) - elif up_target_block_size is not None: - target_block_size = up_target_block_size - else: - target_block_size = down_target_block_size - - # Fuse transformation functions. - down_transform_fn = down_op.get_transformation_fn() - up_transform_fn = up_op.get_transformation_fn() - - def transform_fn(blocks: Iterator[Block]) -> Iterator[Block]: - blocks = up_transform_fn(blocks) - # TODO(Clark): Add zero-copy batching between transform functions. - return down_transform_fn(blocks) - - # We take the downstream op's compute in case we're fusing upstream tasks with a - # downstream actor pool (e.g. read->map). - compute = get_compute(down_logical_op._compute) - ray_remote_args = down_logical_op._ray_remote_args - # Make the upstream operator's inputs the new, fused operator's inputs. - input_deps = up_op.input_dependencies - assert len(input_deps) == 1 - input_op = input_deps[0] - - # Fused physical map operator. - op = MapOperator.create( - transform_fn, - input_op, - name=name, - compute_strategy=compute, - min_rows_per_bundle=target_block_size, - ray_remote_args=ray_remote_args, - ) - - # Build a map logical operator to be used as a reference for further fusion. - # TODO(Clark): This is hacky, remove this once we push fusion to be purely based - # on a lower-level operator spec. - if isinstance(up_logical_op, AbstractMap): - input_op = up_logical_op.input_dependencies[0] - fn = up_logical_op._fn - fn_args = up_logical_op._fn_args - fn_kwargs = up_logical_op._fn_kwargs - fn_constructor_args = up_logical_op._fn_constructor_args - fn_constructor_kwargs = up_logical_op._fn_constructor_kwargs - else: - input_op = up_logical_op - fn = down_logical_op._fn - fn_args = down_logical_op._fn_args - fn_kwargs = down_logical_op._fn_kwargs - fn_constructor_args = down_logical_op._fn_constructor_args - fn_constructor_kwargs = down_logical_op._fn_constructor_kwargs - logical_op = AbstractMap( - name, - input_op, - fn, - fn_args, - fn_kwargs, - fn_constructor_args, - fn_constructor_kwargs, - target_block_size, - compute, - ray_remote_args, - ) - self._op_map[op] = logical_op - # Return the fused physical operator. - return op - - -def _are_remote_args_compatible(up_args, down_args): - """Check if Ray remote arguments are compatible for merging.""" - from ray.data._internal.execution.operators.map_operator import ( - _canonicalize_ray_remote_args, - ) - - up_args = _canonicalize_ray_remote_args(up_args) - down_args = _canonicalize_ray_remote_args(down_args) - remote_args = down_args.copy() - for key in INHERITABLE_REMOTE_ARGS: - if key in up_args: - remote_args[key] = up_args[key] - if up_args != remote_args: - return False - return True diff --git a/python/ray/data/_internal/logical/rules/__init__.py b/python/ray/data/_internal/logical/rules/__init__.py new file mode 100644 index 000000000000..d138fe4b1193 --- /dev/null +++ b/python/ray/data/_internal/logical/rules/__init__.py @@ -0,0 +1,3 @@ +from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule + +__all__ = ["OperatorFusionRule"] diff --git a/python/ray/data/_internal/logical/rules/operator_fusion.py b/python/ray/data/_internal/logical/rules/operator_fusion.py new file mode 100644 index 000000000000..a3520b6ccf89 --- /dev/null +++ b/python/ray/data/_internal/logical/rules/operator_fusion.py @@ -0,0 +1,206 @@ +from typing import Iterator + +from ray.data.block import Block +# TODO(Clark): Remove compute dependency once we delete the legacy compute. +from ray.data._internal.compute import is_task_compute, CallableClass, get_compute +from ray.data._internal.execution.interfaces import PhysicalOperator +from ray.data._internal.logical.interfaces import Rule, PhysicalPlan + + +# Scheduling strategy can be inherited from upstream operator if not specified. +INHERITABLE_REMOTE_ARGS = ["scheduling_strategy"] + + +class OperatorFusionRule(Rule): + """Fuses linear chains of compatible physical operators.""" + + def apply(self, plan: PhysicalPlan) -> PhysicalPlan: + self._op_map = plan.op_map.copy() + # Do DFS fusion. + root = self._apply(plan.dag) + return PhysicalPlan(root, self._op_map) + + def _apply(self, op: PhysicalOperator) -> PhysicalOperator: + """Performs DFS fusion of linear chains of physical map operators, provided that + they are pairwise-compatible. + + Args: + op: The op that we're trying to fuse with its input. + """ + upstream_ops = op.input_dependencies + # Fuse with upstream ops while possible. + while len(upstream_ops) == 1 and self._can_fuse(op, upstream_ops[0]): + # Fuse operator with its upstream op. + op = self._fuse(op, upstream_ops[0]) + upstream_ops = op.input_dependencies + # Can no longer fuse with upstream ops, proceed up the DAG. + op._input_dependencies = [ + self._apply(upstream_op) for upstream_op in upstream_ops + ] + return op + + def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool: + """Returns whether the provided downstream operator can be fused with the given + upstream operator. + + We currently support fusing two operators if the following are all true: + * They are both MapOperators. + * They either use the same compute configuration, or the upstream operator + uses a task pool while the downstream operator uses an actor pool. + * If both operators involve callable classes, the callable classes are + the same class AND constructor args are the same for both. + * They have compatible remote arguments. + """ + from ray.data._internal.execution.operators.map_operator import MapOperator + from ray.data._internal.logical.operators.map_operator import AbstractMap + from ray.data._internal.logical.operators.read_operator import Read + + # We only support fusing MapOperators. + if not isinstance(down_op, MapOperator) or not isinstance(up_op, MapOperator): + return False + + down_logical_op = self._op_map[down_op] + up_logical_op = self._op_map[up_op] + + # We only support fusing upstream reads and maps with downstream maps. + if ( + not isinstance(down_logical_op, AbstractMap) + or not isinstance(up_logical_op, (Read, AbstractMap)) + ): + return False + + # Allow fusing tasks->actors if the resources are compatible (read->map), but + # not the other way around. The latter (downstream op) will be used as the + # compute if fused. + if ( + is_task_compute(down_logical_op._compute) + and isinstance(up_logical_op, AbstractMap) + and get_compute(up_logical_op._compute) + != get_compute(down_logical_op._compute) + ): + return False + + # Fusing callable classes is only supported if they are the same function AND + # their construction arguments are the same. + # TODO(Clark): Support multiple callable classes instantiating in the same actor + # worker. + if ( + isinstance(down_logical_op._fn, CallableClass) + and isinstance(up_logical_op, AbstractMap) + and isinstance(up_logical_op._fn, CallableClass) + and ( + up_logical_op._fn != down_logical_op._fn + or ( + up_logical_op._fn_constructor_args + != down_logical_op._fn_constructor_args + or up_logical_op._fn_constructor_kwargs + != down_logical_op._fn_constructor_kwargs + ) + ) + ): + return False + + # Only fuse if the ops' remote arguments are compatible. + if not _are_remote_args_compatible( + up_logical_op._ray_remote_args or {}, down_logical_op._ray_remote_args or {} + ): + return False + + # Otherwise, ops are compatible for fusion. + return True + + def _fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator): + """Fuse the downstream operator with its upstream operator.""" + from ray.data._internal.execution.operators.map_operator import MapOperator + from ray.data._internal.logical.operators.map_operator import AbstractMap + + assert self._can_fuse(down_op, up_op) + + # Fuse operator names. + name = up_op.name + "->" + down_op.name + + down_logical_op = self._op_map.pop(down_op) + up_logical_op = self._op_map.pop(up_op) + + # Merge target block sizes. + down_target_block_size = down_logical_op._target_block_size + up_target_block_size = ( + up_logical_op._target_block_size + if isinstance(up_logical_op, AbstractMap) + else None + ) + if down_target_block_size is not None and up_target_block_size is not None: + target_block_size = max(down_target_block_size, up_target_block_size) + elif up_target_block_size is not None: + target_block_size = up_target_block_size + else: + target_block_size = down_target_block_size + + # Fuse transformation functions. + down_transform_fn = down_op.get_transformation_fn() + up_transform_fn = up_op.get_transformation_fn() + + def transform_fn(blocks: Iterator[Block]) -> Iterator[Block]: + blocks = up_transform_fn(blocks) + # TODO(Clark): Add zero-copy batching between transform functions. + return down_transform_fn(blocks) + + # We take the downstream op's compute in case we're fusing upstream tasks with a + # downstream actor pool (e.g. read->map). + compute = get_compute(down_logical_op._compute) + ray_remote_args = down_logical_op._ray_remote_args + # Make the upstream operator's inputs the new, fused operator's inputs. + input_deps = up_op.input_dependencies + assert len(input_deps) == 1 + input_op = input_deps[0] + + # Fused physical map operator. + op = MapOperator.create( + transform_fn, + input_op, + name=name, + compute_strategy=compute, + min_rows_per_bundle=target_block_size, + ray_remote_args=ray_remote_args, + ) + + # Build a map logical operator to be used as a reference for further fusion. + # TODO(Clark): This is hacky, remove this once we push fusion to be purely based + # on a lower-level operator spec. + if isinstance(up_logical_op, AbstractMap): + input_op = up_logical_op.input_dependencies[0] + else: + # Bottom out at the source logical op (e.g. Read()). + input_op = up_logical_op + logical_op = AbstractMap( + name, + input_op, + down_logical_op._fn, + down_logical_op._fn_args, + down_logical_op._fn_kwargs, + down_logical_op._fn_constructor_args, + down_logical_op._fn_constructor_kwargs, + target_block_size, + compute, + ray_remote_args, + ) + self._op_map[op] = logical_op + # Return the fused physical operator. + return op + + +def _are_remote_args_compatible(up_args, down_args): + """Check if Ray remote arguments are compatible for merging.""" + from ray.data._internal.execution.operators.map_operator import ( + _canonicalize_ray_remote_args, + ) + + up_args = _canonicalize_ray_remote_args(up_args) + down_args = _canonicalize_ray_remote_args(down_args) + remote_args = down_args.copy() + for key in INHERITABLE_REMOTE_ARGS: + if key in up_args: + remote_args[key] = up_args[key] + if up_args != remote_args: + return False + return True diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py index d313754e7f66..ea5848411a4b 100644 --- a/python/ray/data/tests/test_execution_optimizer.py +++ b/python/ray/data/tests/test_execution_optimizer.py @@ -283,19 +283,6 @@ def test_read_map_chain_operator_fusion(ray_start_cluster_enabled, enable_optimi assert isinstance(physical_op.input_dependencies[0], InputDataBuffer) -def test_read_map_chain_operator_fusion_e2e( - ray_start_cluster_enabled, enable_optimizer -): - ds = ray.data.range(10, parallelism=2) - ds = ds.filter(lambda x: x % 2 == 0) - ds = ds.map(lambda x: x + 1) - ds = ds.map_batches(lambda batch: [2 * x for x in batch], batch_size=None) - ds = ds.flat_map(lambda x: [-x, x]) - assert ds.take_all() == [-2, 2, -6, 6, -10, 10, -14, 14, -18, 18] - name = "DoRead->Filter->MapRows->MapBatches->FlatMap:" - assert name in ds.stats() - - def test_read_map_batches_operator_fusion_compatible_remote_args( ray_start_cluster_enabled, enable_optimizer ): @@ -526,6 +513,19 @@ def __call__(self, x): assert isinstance(physical_op.input_dependencies[0], InputDataBuffer) +def test_read_map_chain_operator_fusion_e2e( + ray_start_cluster_enabled, enable_optimizer +): + ds = ray.data.range(10, parallelism=2) + ds = ds.filter(lambda x: x % 2 == 0) + ds = ds.map(lambda x: x + 1) + ds = ds.map_batches(lambda batch: [2 * x for x in batch], batch_size=None) + ds = ds.flat_map(lambda x: [-x, x]) + assert ds.take_all() == [-2, 2, -6, 6, -10, 10, -14, 14, -18, 18] + name = "DoRead->Filter->MapRows->MapBatches->FlatMap:" + assert name in ds.stats() + + if __name__ == "__main__": import sys From 633eb4083382872d41ecfa098973b024d1f552a7 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Tue, 31 Jan 2023 16:16:47 +0000 Subject: [PATCH 3/5] Fixes from merge. --- python/ray/data/_internal/execution/interfaces.py | 1 - .../_internal/execution/operators/all_to_all_operator.py | 4 +--- .../ray/data/_internal/execution/operators/map_operator.py | 2 +- python/ray/data/_internal/logical/rules/operator_fusion.py | 6 +++--- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index f5ce3263bcc6..2a350e9efd18 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -8,7 +8,6 @@ from ray.data.block import Block, BlockMetadata from ray.data.context import DatasetContext from ray.types import ObjectRef -from typing import Callable @dataclass diff --git a/python/ray/data/_internal/execution/operators/all_to_all_operator.py b/python/ray/data/_internal/execution/operators/all_to_all_operator.py index eac95e54a788..5c6389a3fa5b 100644 --- a/python/ray/data/_internal/execution/operators/all_to_all_operator.py +++ b/python/ray/data/_internal/execution/operators/all_to_all_operator.py @@ -68,7 +68,5 @@ def get_next(self) -> RefBundle: def get_stats(self) -> StatsDict: return self._stats - def get_transformation_fn( - self, - ) -> Callable[[List[RefBundle]], Tuple[List[RefBundle], StatsDict]]: + def get_transformation_fn(self) -> AllToAllTransformFn: return self._bulk_fn diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 23430764ca14..56e4951fdd90 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -262,7 +262,7 @@ def get_metrics(self) -> Dict[str, int]: def get_stats(self) -> StatsDict: return {self._name: self._output_metadata} - def get_transformation_fn(self) -> Callable[[Iterator[Block]], Iterator[Block]]: + def get_transformation_fn(self) -> MapTransformFn: return self._transform_fn @abstractmethod diff --git a/python/ray/data/_internal/logical/rules/operator_fusion.py b/python/ray/data/_internal/logical/rules/operator_fusion.py index a3520b6ccf89..18a23fe59ada 100644 --- a/python/ray/data/_internal/logical/rules/operator_fusion.py +++ b/python/ray/data/_internal/logical/rules/operator_fusion.py @@ -1,6 +1,7 @@ from typing import Iterator from ray.data.block import Block + # TODO(Clark): Remove compute dependency once we delete the legacy compute. from ray.data._internal.compute import is_task_compute, CallableClass, get_compute from ray.data._internal.execution.interfaces import PhysicalOperator @@ -63,9 +64,8 @@ def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool: up_logical_op = self._op_map[up_op] # We only support fusing upstream reads and maps with downstream maps. - if ( - not isinstance(down_logical_op, AbstractMap) - or not isinstance(up_logical_op, (Read, AbstractMap)) + if not isinstance(down_logical_op, AbstractMap) or not isinstance( + up_logical_op, (Read, AbstractMap) ): return False From dceb521b1d53c5e36b4c108b4ee8036c319592b4 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Tue, 31 Jan 2023 18:35:48 +0000 Subject: [PATCH 4/5] Fix for TaskContext. --- .../ray/data/_internal/logical/rules/operator_fusion.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/data/_internal/logical/rules/operator_fusion.py b/python/ray/data/_internal/logical/rules/operator_fusion.py index 18a23fe59ada..c78e4f1cc488 100644 --- a/python/ray/data/_internal/logical/rules/operator_fusion.py +++ b/python/ray/data/_internal/logical/rules/operator_fusion.py @@ -4,7 +4,7 @@ # TODO(Clark): Remove compute dependency once we delete the legacy compute. from ray.data._internal.compute import is_task_compute, CallableClass, get_compute -from ray.data._internal.execution.interfaces import PhysicalOperator +from ray.data._internal.execution.interfaces import PhysicalOperator, TaskContext from ray.data._internal.logical.interfaces import Rule, PhysicalPlan @@ -140,10 +140,10 @@ def _fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator): down_transform_fn = down_op.get_transformation_fn() up_transform_fn = up_op.get_transformation_fn() - def transform_fn(blocks: Iterator[Block]) -> Iterator[Block]: - blocks = up_transform_fn(blocks) + def transform_fn(blocks: Iterator[Block], ctx: TaskContext) -> Iterator[Block]: + blocks = up_transform_fn(blocks, ctx) # TODO(Clark): Add zero-copy batching between transform functions. - return down_transform_fn(blocks) + return down_transform_fn(blocks, ctx) # We take the downstream op's compute in case we're fusing upstream tasks with a # downstream actor pool (e.g. read->map). From 39fd63fb6bb1693627bf8a0f6a234ad6f780fbe0 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Tue, 31 Jan 2023 19:54:04 +0000 Subject: [PATCH 5/5] Change to proper test fixture. --- .../data/tests/test_execution_optimizer.py | 76 ++++++++----------- 1 file changed, 33 insertions(+), 43 deletions(-) diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py index ea5848411a4b..6228905c1aac 100644 --- a/python/ray/data/tests/test_execution_optimizer.py +++ b/python/ray/data/tests/test_execution_optimizer.py @@ -24,7 +24,7 @@ from ray.tests.conftest import * # noqa -def test_read_operator(ray_start_cluster_enabled, enable_optimizer): +def test_read_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() op = Read(ParquetDatasource()) plan = LogicalPlan(op) @@ -36,7 +36,7 @@ def test_read_operator(ray_start_cluster_enabled, enable_optimizer): assert isinstance(physical_op.input_dependencies[0], InputDataBuffer) -def test_map_batches_operator(ray_start_cluster_enabled, enable_optimizer): +def test_map_batches_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() read_op = Read(ParquetDatasource()) op = MapBatches( @@ -52,13 +52,13 @@ def test_map_batches_operator(ray_start_cluster_enabled, enable_optimizer): assert isinstance(physical_op.input_dependencies[0], MapOperator) -def test_map_batches_e2e(ray_start_cluster_enabled, enable_optimizer): +def test_map_batches_e2e(ray_start_regular_shared, enable_optimizer): ds = ray.data.range(5) ds = ds.map_batches(lambda x: x) assert ds.take_all() == list(range(5)), ds -def test_map_rows_operator(ray_start_cluster_enabled, enable_optimizer): +def test_map_rows_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() read_op = Read(ParquetDatasource()) op = MapRows( @@ -74,13 +74,13 @@ def test_map_rows_operator(ray_start_cluster_enabled, enable_optimizer): assert isinstance(physical_op.input_dependencies[0], MapOperator) -def test_map_rows_e2e(ray_start_cluster_enabled, enable_optimizer): +def test_map_rows_e2e(ray_start_regular_shared, enable_optimizer): ds = ray.data.range(5) ds = ds.map(lambda x: x + 1) assert ds.take_all() == [1, 2, 3, 4, 5], ds -def test_filter_operator(ray_start_cluster_enabled, enable_optimizer): +def test_filter_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() read_op = Read(ParquetDatasource()) op = Filter( @@ -96,13 +96,13 @@ def test_filter_operator(ray_start_cluster_enabled, enable_optimizer): assert isinstance(physical_op.input_dependencies[0], MapOperator) -def test_filter_e2e(ray_start_cluster_enabled, enable_optimizer): +def test_filter_e2e(ray_start_regular_shared, enable_optimizer): ds = ray.data.range(5) ds = ds.filter(fn=lambda x: x % 2 == 0) assert ds.take_all() == [0, 2, 4], ds -def test_flat_map(ray_start_cluster_enabled, enable_optimizer): +def test_flat_map(ray_start_regular_shared, enable_optimizer): planner = Planner() read_op = Read(ParquetDatasource()) op = FlatMap( @@ -118,13 +118,13 @@ def test_flat_map(ray_start_cluster_enabled, enable_optimizer): assert isinstance(physical_op.input_dependencies[0], MapOperator) -def test_flat_map_e2e(ray_start_cluster_enabled, enable_optimizer): +def test_flat_map_e2e(ray_start_regular_shared, enable_optimizer): ds = ray.data.range(2) ds = ds.flat_map(fn=lambda x: [x, x]) assert ds.take_all() == [0, 0, 1, 1], ds -def test_column_ops_e2e(ray_start_cluster_enabled, enable_optimizer): +def test_column_ops_e2e(ray_start_regular_shared, enable_optimizer): ds = ray.data.range(2) ds = ds.add_column(fn=lambda df: df.iloc[:, 0], col="new_col") assert ds.take_all() == [{"value": 0, "new_col": 0}, {"value": 1, "new_col": 1}], ds @@ -136,7 +136,7 @@ def test_column_ops_e2e(ray_start_cluster_enabled, enable_optimizer): assert ds.take_all() == [{"value": 0}, {"value": 1}], ds -def test_random_sample_e2e(ray_start_cluster_enabled, enable_optimizer): +def test_random_sample_e2e(ray_start_regular_shared, enable_optimizer): import math def ensure_sample_size_close(dataset, sample_percent=0.5): @@ -155,7 +155,7 @@ def ensure_sample_size_close(dataset, sample_percent=0.5): ensure_sample_size_close(ds) -def test_randomize_blocks_operator(ray_start_cluster_enabled, enable_optimizer): +def test_randomize_blocks_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() read_op = Read(ParquetDatasource()) op = RandomizeBlocks( @@ -171,13 +171,13 @@ def test_randomize_blocks_operator(ray_start_cluster_enabled, enable_optimizer): assert isinstance(physical_op.input_dependencies[0], MapOperator) -def test_randomize_blocks_e2e(ray_start_cluster_enabled, enable_optimizer): +def test_randomize_blocks_e2e(ray_start_regular_shared, enable_optimizer): ds = ray.data.range(12, parallelism=4) ds = ds.randomize_block_order(seed=0) assert ds.take_all() == [6, 7, 8, 0, 1, 2, 3, 4, 5, 9, 10, 11], ds -def test_random_shuffle_operator(ray_start_cluster_enabled, enable_optimizer): +def test_random_shuffle_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() read_op = Read(ParquetDatasource()) op = RandomShuffle( @@ -194,7 +194,7 @@ def test_random_shuffle_operator(ray_start_cluster_enabled, enable_optimizer): def test_random_shuffle_e2e( - ray_start_cluster_enabled, enable_optimizer, use_push_based_shuffle + ray_start_regular_shared, enable_optimizer, use_push_based_shuffle ): ds = ray.data.range(12, parallelism=4) r1 = ds.random_shuffle(seed=0).take_all() @@ -204,14 +204,10 @@ def test_random_shuffle_e2e( assert sorted(r2) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], r2 -def test_repartition_operator(ray_start_cluster_enabled, enable_optimizer): +def test_repartition_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() read_op = Read(ParquetDatasource()) - op = Repartition( - read_op, - num_outputs=5, - shuffle=True, - ) + op = Repartition(read_op, num_outputs=5, shuffle=True) plan = LogicalPlan(op) physical_op = planner.plan(plan).dag @@ -221,18 +217,14 @@ def test_repartition_operator(ray_start_cluster_enabled, enable_optimizer): assert isinstance(physical_op.input_dependencies[0], MapOperator) # Check error is thrown for non-shuffle repartition. + op = Repartition(read_op, num_outputs=5, shuffle=False) + plan = LogicalPlan(op) with pytest.raises(AssertionError): - planner.plan( - Repartition( - read_op, - num_outputs=5, - shuffle=False, - ) - ) + planner.plan(plan) def test_repartition_e2e( - ray_start_cluster_enabled, enable_optimizer, use_push_based_shuffle + ray_start_regular_shared, enable_optimizer, use_push_based_shuffle ): ds = ray.data.range(10000, parallelism=10) ds1 = ds.repartition(20, shuffle=True) @@ -243,7 +235,7 @@ def test_repartition_e2e( ds.repartition(20, shuffle=False).take_all() -def test_read_map_batches_operator_fusion(ray_start_cluster_enabled, enable_optimizer): +def test_read_map_batches_operator_fusion(ray_start_regular_shared, enable_optimizer): # Test that Read is fused with MapBatches. planner = Planner() read_op = Read(ParquetDatasource()) @@ -263,7 +255,7 @@ def test_read_map_batches_operator_fusion(ray_start_cluster_enabled, enable_opti assert isinstance(physical_op.input_dependencies[0], InputDataBuffer) -def test_read_map_chain_operator_fusion(ray_start_cluster_enabled, enable_optimizer): +def test_read_map_chain_operator_fusion(ray_start_regular_shared, enable_optimizer): # Test that a chain of different map operators are fused. planner = Planner() read_op = Read(ParquetDatasource()) @@ -284,7 +276,7 @@ def test_read_map_chain_operator_fusion(ray_start_cluster_enabled, enable_optimi def test_read_map_batches_operator_fusion_compatible_remote_args( - ray_start_cluster_enabled, enable_optimizer + ray_start_regular_shared, enable_optimizer ): # Test that map operators are stilled fused when remote args are compatible. planner = Planner() @@ -307,7 +299,7 @@ def test_read_map_batches_operator_fusion_compatible_remote_args( def test_read_map_batches_operator_fusion_incompatible_remote_args( - ray_start_cluster_enabled, enable_optimizer + ray_start_regular_shared, enable_optimizer ): # Test that map operators are not fused when remote args are incompatible. planner = Planner() @@ -331,7 +323,7 @@ def test_read_map_batches_operator_fusion_incompatible_remote_args( def test_read_map_batches_operator_fusion_compute_tasks_to_actors( - ray_start_cluster_enabled, enable_optimizer + ray_start_regular_shared, enable_optimizer ): # Test that a task-based map operator is fused into an actor-based map operator when # the former comes before the latter. @@ -352,7 +344,7 @@ def test_read_map_batches_operator_fusion_compute_tasks_to_actors( def test_read_map_batches_operator_fusion_compute_read_to_actors( - ray_start_cluster_enabled, enable_optimizer + ray_start_regular_shared, enable_optimizer ): # Test that reads fuse into an actor-based map operator. planner = Planner() @@ -371,7 +363,7 @@ def test_read_map_batches_operator_fusion_compute_read_to_actors( def test_read_map_batches_operator_fusion_incompatible_compute( - ray_start_cluster_enabled, enable_optimizer + ray_start_regular_shared, enable_optimizer ): # Test that map operators are not fused when compute strategies are incompatible. planner = Planner() @@ -394,7 +386,7 @@ def test_read_map_batches_operator_fusion_incompatible_compute( def test_read_map_batches_operator_fusion_target_block_size( - ray_start_cluster_enabled, enable_optimizer + ray_start_regular_shared, enable_optimizer ): # Test that fusion of map operators merges their block sizes in the expected way # (taking the max). @@ -419,7 +411,7 @@ def test_read_map_batches_operator_fusion_target_block_size( def test_read_map_batches_operator_fusion_callable_classes( - ray_start_cluster_enabled, enable_optimizer + ray_start_regular_shared, enable_optimizer ): # Test that callable classes can still be fused if they're the same function. planner = Planner() @@ -444,7 +436,7 @@ def __call__(self, x): def test_read_map_batches_operator_fusion_incompatible_callable_classes( - ray_start_cluster_enabled, enable_optimizer + ray_start_regular_shared, enable_optimizer ): # Test that map operators are not fused when different callable classes are used. planner = Planner() @@ -476,7 +468,7 @@ def __call__(self, x): def test_read_map_batches_operator_fusion_incompatible_constructor_args( - ray_start_cluster_enabled, enable_optimizer + ray_start_regular_shared, enable_optimizer ): # Test that map operators are not fused when callable classes have different # constructor args. @@ -513,9 +505,7 @@ def __call__(self, x): assert isinstance(physical_op.input_dependencies[0], InputDataBuffer) -def test_read_map_chain_operator_fusion_e2e( - ray_start_cluster_enabled, enable_optimizer -): +def test_read_map_chain_operator_fusion_e2e(ray_start_regular_shared, enable_optimizer): ds = ray.data.range(10, parallelism=2) ds = ds.filter(lambda x: x % 2 == 0) ds = ds.map(lambda x: x + 1)