Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] [Operator Fusion - 1/2] Add operator fusion to new execution planner. #32095

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions python/ray/data/_internal/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ def _apply(
owned_by_consumer=in_block_owned_by_consumer,
)

def __eq__(self, other: Any) -> bool:
return isinstance(other, TaskPoolStrategy)


@PublicAPI
class ActorPoolStrategy(ComputeStrategy):
Expand Down Expand Up @@ -449,6 +452,14 @@ def map_block_nosplit(
finally:
raise e from None

def __eq__(self, other: Any) -> bool:
return isinstance(other, ActorPoolStrategy) and (
self.min_size == other.min_size
and self.max_size == other.max_size
and self.max_tasks_in_flight_per_actor
== other.max_tasks_in_flight_per_actor
)


def get_compute(compute_spec: Union[str, ComputeStrategy]) -> ComputeStrategy:
if not compute_spec or compute_spec == "tasks":
Expand Down
10 changes: 8 additions & 2 deletions python/ray/data/_internal/execution/interfaces.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Dict, List, Optional, Iterable, Tuple
from typing import Dict, List, Optional, Iterable, Tuple, Callable

import ray
from ray.data._internal.logical.interfaces import Operator
Expand All @@ -8,7 +8,6 @@
from ray.data.block import Block, BlockMetadata
from ray.data.context import DatasetContext
from ray.types import ObjectRef
from typing import Callable


@dataclass
Expand Down Expand Up @@ -237,6 +236,13 @@ def get_metrics(self) -> Dict[str, int]:
"""
return {}

def get_transformation_fn(self) -> Callable:
"""Returns the underlying transformation function for this operator.

This is used by the physical plan optimizer for e.g. operator fusion.
"""
raise NotImplementedError

def progress_str(self) -> str:
"""Return any extra status to be displayed in the operator progress bar.

Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Iterator, Tuple, Any

import ray
from ray.data._internal.logical.optimizers import get_execution_dag
from ray.data._internal.logical.optimizers import get_execution_plan
from ray.data.context import DatasetContext
from ray.types import ObjectRef
from ray.data.block import Block, BlockMetadata, List
Expand Down Expand Up @@ -78,7 +78,7 @@ def execute_to_legacy_block_list(
The output as a legacy block list.
"""
if DatasetContext.get_current().optimizer_enabled:
dag, stats = get_execution_dag(plan._logical_plan.dag), None
dag, stats = get_execution_plan(plan._logical_plan).dag, None
else:
dag, stats = _to_operator_dag(plan, allow_clear_input_blocks)
bundles = executor.execute(dag, initial_stats=stats)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,6 @@ def get_next(self) -> RefBundle:

def get_stats(self) -> StatsDict:
return self._stats

def get_transformation_fn(self) -> AllToAllTransformFn:
return self._bulk_fn
10 changes: 7 additions & 3 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ def __init__(
# instead.
# NOTE: This constructor must be called by subclasses.

# Put the function def in the object store to avoid repeated serialization
# in case it's large (i.e., closure captures large objects).
self._transform_fn_ref = ray.put(transform_fn)
self._transform_fn = transform_fn
self._ray_remote_args = _canonicalize_ray_remote_args(ray_remote_args or {})

# Bundles block references up to the min_rows_per_bundle target.
Expand Down Expand Up @@ -142,6 +140,9 @@ def start(self, options: "ExecutionOptions"):
ray.get_runtime_context().get_node_id(),
soft=True,
)
# Put the function def in the object store to avoid repeated serialization
# in case it's large (i.e., closure captures large objects).
self._transform_fn_ref = ray.put(self._transform_fn)
super().start(options)

def add_input(self, refs: RefBundle, input_index: int):
Expand Down Expand Up @@ -261,6 +262,9 @@ def get_metrics(self) -> Dict[str, int]:
def get_stats(self) -> StatsDict:
return {self._name: self._output_metadata}

def get_transformation_fn(self) -> MapTransformFn:
return self._transform_fn

@abstractmethod
def shutdown(self):
# NOTE: This must be implemented by subclasses, and those overriding methods
Expand Down
61 changes: 55 additions & 6 deletions python/ray/data/_internal/logical/interfaces.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import List
from typing import List, Dict, TYPE_CHECKING

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces import PhysicalOperator


class Operator:
Expand Down Expand Up @@ -51,11 +54,57 @@ def __init__(self, name: str, input_dependencies: List["LogicalOperator"]):
assert isinstance(x, LogicalOperator), x


class Plan:
"""Abstract class for logical/physical execution plans.

This plan should hold an operator representing the plan DAG and any auxiliary data
that's useful for plan optimization or execution.
"""

@property
def dag(self) -> Operator:
raise NotImplementedError


class LogicalPlan(Plan):
"""The plan with a DAG of logical operators."""

def __init__(self, dag: LogicalOperator):
self._dag = dag

@property
def dag(self) -> LogicalOperator:
"""Get the DAG of logical operators."""
return self._dag


class PhysicalPlan(Plan):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file path is "logical/interfaces.py" but we are now introducing physical pieces. Shall we name the path as optimizer? It'll be consistent of the 3 components of query processing (planner, optimizer, execution).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine for the renaming if others have no objection. But wanted to make sure we are addressing renaming in a separate PR, for easier review.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a "planner/" directory, why the "Plan" and related are not belong to "planner/"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's fine to put Plan/LogicalPlan/PhysicalPlan here, as Rule depends on Plan, and Optimizer depends on Rule. In the future, we may generalize some graph traversal logic into Plan.

"""The plan with a DAG of physical operators."""

def __init__(
self, dag: "PhysicalOperator", op_map: Dict["PhysicalOperator", LogicalOperator]
):
self._dag = dag
self._op_map = op_map

@property
def dag(self) -> "PhysicalOperator":
"""Get the DAG of physical operators."""
return self._dag

@property
def op_map(self) -> Dict["PhysicalOperator", LogicalOperator]:
"""
Get a mapping from physical operators to their corresponding logical operator.
"""
return self._op_map


class Rule:
"""Abstract class for optimization rule."""

def apply(dag: Operator) -> Operator:
"""Apply the optimization rule to the DAG of operators."""
def apply(plan: Plan) -> Plan:
"""Apply the optimization rule to the execution plan."""
raise NotImplementedError


Expand All @@ -70,8 +119,8 @@ def rules(self) -> List[Rule]:
"""List of predefined rules for this optimizer."""
raise NotImplementedError

def optimize(self, dag: Operator) -> Operator:
def optimize(self, plan: Plan) -> Plan:
"""Optimize operators with a list of rules."""
for rule in self.rules:
dag = rule.apply(dag)
return dag
plan = rule.apply(plan)
return plan
34 changes: 13 additions & 21 deletions python/ray/data/_internal/logical/optimizers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from typing import List

from ray.data._internal.execution.interfaces import PhysicalOperator
from ray.data._internal.logical.interfaces import Rule, Optimizer, LogicalOperator
from ray.data._internal.logical.interfaces import (
Rule,
Optimizer,
LogicalPlan,
PhysicalPlan,
)
from ray.data._internal.logical.rules import OperatorFusionRule
from ray.data._internal.planner.planner import Planner


Expand All @@ -19,30 +24,17 @@ class PhysicalOptimizer(Optimizer):

@property
def rules(self) -> List["Rule"]:
# TODO: Add physical optimizer rules.
return []


class LogicalPlan:
"""The plan with a DAG of logical operators."""

def __init__(self, dag: LogicalOperator):
self._dag = dag

@property
def dag(self) -> LogicalOperator:
"""Get the DAG of logical operators."""
return self._dag
return [OperatorFusionRule()]


def get_execution_dag(logical_dag: LogicalOperator) -> PhysicalOperator:
"""Get the DAG of physical operators to execute.
def get_execution_plan(logical_plan: LogicalPlan) -> PhysicalPlan:
"""Get the physical execution plan for the provided logical plan.

This process has 3 steps:
(1) logical optimization: optimize logical operators.
(2) planning: convert logical to physical operators.
(3) physical optimization: optimize physical operators.
"""
optimized_logical_dag = LogicalOptimizer().optimize(logical_dag)
physical_dag = Planner().plan(optimized_logical_dag)
return PhysicalOptimizer().optimize(physical_dag)
logical_plan = LogicalOptimizer().optimize(logical_plan)
physical_plan = Planner().plan(logical_plan)
return PhysicalOptimizer().optimize(physical_plan)
3 changes: 3 additions & 0 deletions python/ray/data/_internal/logical/rules/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule

__all__ = ["OperatorFusionRule"]
Loading