Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] [Operator Fusion - 1/2] Add operator fusion to new execution planner. #32095

Conversation

clarkzinzow
Copy link
Contributor

@clarkzinzow clarkzinzow commented Jan 31, 2023

This PR adds operation fusion to the new execution planner. A further optimization for zero-copy batching between the block transforms of fused operators will be added in a follow-up PR (this is a new optimization, not status quo).

In an effort to not expand the PhysicalOperator API and to not further complicate the physical MapOperator while still expressing this optimization as a physical plan optimization (as I believe it should be), this PR introduces a PhysicalPlan that the execution layer is ignorant of that holds auxiliary data that enables physical plan optimization such as stage fusion (in this case, a PhysicalOperator -> LogicalOperator map).

Long-term, we should probably have one last planning layer consistent of stateless "physical operators" that we can perform these last-mile optimizations on without muddying the operator execution API (with the typical "what" vs. "how" distinction).

Related issue number

Closes #31893

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(


# Build a map logical operator to be used as a reference for further fusion.
# TODO(Clark): This is hacky, remove this once we push fusion to be purely based
# on a lower-level operator spec.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we need information both at the LogicalOperator and PhysicalOperator level to perform this fusion, not sure what else we can do here other than:

  1. Push the remaining requisite information into the physical MapOperator (constructor args, lift target_block_size out of the bundler, etc.).
  2. Push the block transform function into the logical AbstractMap operator and do the fusion at the logical operator level.

Each have their complications/cons, e.g. for (1) we'd need to clear the constructor args at op.start() in order to keep us from needlessly hanging on to object references, and for (2) we'd be muddying the logical layer with an execution-level concept (block transformations).

@@ -18,7 +18,7 @@ def generate_map_batches_fn(
batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default",
prefetch_batches: int = 0,
zero_copy_batch: bool = False,
) -> Callable[[Iterator[Block]], Iterator[Block]]:
) -> Callable[[Iterator[Block], BatchUDF], Iterator[Block]]:
Copy link
Contributor Author

@clarkzinzow clarkzinzow Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@c21 This returned callable is technically def transform(blocks: Iterator[Block], fn: BatchUDF, *args, **kwargs) -> Iterator[Block]), thinking about updating this and others with a typing.Protocol to capture this. https://docs.python.org/3/library/typing.html#protocols

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I noticed it today, let's do a followup PR to fix these types. Seems to be minor.

@ericl
Copy link
Contributor

ericl commented Jan 31, 2023

Could we introduce a logical fused node (with multiple logical nodes as children) that would be generated by the logical fusion rule? Then, the planner just needs to know how to generate code for this type of node.

# not the other way around. The latter (downstream op) will be used as the
# compute if fused.
if (
is_task_compute(down_logical_op._compute)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we check isinstance(down_logical_op, AbstractMap) before is_task_compute(down_logical_op._compute) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think that's a good defensive check to add. We already have an isinstance(down_op, MapOperator) check above, which limits the logical op to Read() or AbstractMap(), and you currently can't have a Read() as the downstream op (it's always a source op), but that's a lot of assumptions. And there's a good chance that we'll introduce more logical operations that will result in a MapOperator physical operator, so that's a good defensive check for guarding against future failures.

Actually, how about we add an upfront logical op check like this right after fetching the logical ops?

if not isinstance(down_logical_op, AbstractMap) or not isinstance(up_logical_op, (Read, AbstractMap)):
   return False

Then we can assume that constraint for the rest of the function and tweak that condition as we add more logical ops that result in physical MapOperators.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah SGTM.

Comment on lines 154 to 155
if isinstance(up_logical_op, AbstractMap)
else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check down_logical_op is AbstractMap before accessing down_logical_op._target_block_size, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, see my above comment about allow-listing downstream and upstream logical ops!

python/ray/data/_internal/logical/optimizers.py Outdated Show resolved Hide resolved
Comment on lines 203 to 208
input_op = up_logical_op
fn = down_logical_op._fn
fn_args = down_logical_op._fn_args
fn_kwargs = down_logical_op._fn_kwargs
fn_constructor_args = down_logical_op._fn_constructor_args
fn_constructor_kwargs = down_logical_op._fn_constructor_kwargs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my understanding, when this case can happen? Is it for Read? And why we set all fn-related args from down_logical_op, instead of from up_logical_op?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@c21 Yep it's for when the upstream logical op is a Read in which case it won't have any of those fn-related args, but I also just realized that this isn't quite right: we should be adopting the downstream op UDF and associated args, not the upstream op, and we should only be taking the upstream op input dependency as the input op (where we bottom-out at the source/read op, which isn't quite correct but should be fine for providing info to physical optimization rules).

@@ -18,7 +18,7 @@ def generate_map_batches_fn(
batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default",
prefetch_batches: int = 0,
zero_copy_batch: bool = False,
) -> Callable[[Iterator[Block]], Iterator[Block]]:
) -> Callable[[Iterator[Block], BatchUDF], Iterator[Block]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I noticed it today, let's do a followup PR to fix these types. Seems to be minor.


from ray.data.block import Block
from ray.data._internal.compute import is_task_compute, CallableClass, get_compute
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we have to depend on ray.data._internal.compute here. Let's add a TODO to refactor those needed methods out of ray.data._internal.compute. So we don't take a dependency on it in the future - i.e. we plan to delete compute.py eventually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep I decided to accept this dependency for now and thought that we could remove this dependency when we delete compute.py, otherwise we start to accumulate repeated definitions and run the risk of drift. Some of these like CallableClass should be moved to a user-facing interfaces file (since the type is user-facing), while is_task_compute() and get_compute() could probably be moved into this module, but I held off on that for now to try to keep this PR size from getting too large.

"""Convert logical to physical operators recursively in post-order."""
physical_dag = self._plan(logical_plan.dag)
return PhysicalPlan(physical_dag, self._physical_op_to_logical_op)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make a defensive copy of self._physical_op_to_logical_op, before passing to PhysicalPlan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this map is specifically generated for this PhysicalPlan, it shouldn't be used anywhere else, and we are defensively copying in the optimization rule, which I believe is the correct place? https://github.com/ray-project/ray/blob/9afbdcaffdcfa560f08bf6938d9f9fa80d711b44/python/ray/data/_internal/logical/optimizers.py#L53

@c21
Copy link
Contributor

c21 commented Jan 31, 2023

Could we introduce a logical fused node (with multiple logical nodes as children) that would be generated by the logical fusion rule? Then, the planner just needs to know how to generate code for this type of node.

@ericl - I think about it and don't think it can work well. E.g. currently a Read logical operator is planned into MapOperator(InputDataBuffer). We never know whether the Read logical operator can be fused with downstream operator (e.g. a actor-based MapBatches), until Planner.plan(). If we do the operators fusion in purely logical rule, then this makes planner defunct.

Another example is aggregate, currently we would have a sort-based aggregate physical operator (as AllToAllOperator). It cannot be fused with other operator because sort sampling is a stop-world event (all input needs to be materialized before sampling). However nothing stops us to implement a hash-based aggregate physical operator in the future, which supports fusion by design. Doing operator fusion in logical rule, will make fusion optimization impossible.

@clarkzinzow will soon add a PR to fuse batch functions together, and the current approach (as physical rule) plays well with the planner framework (we basically fuse multiple generated function together, and generated function is a thing in physical layer).

As an evidence from other system, Spark is doing operators fusion (combine multiple physical operators into a code-gen operator) after planning, at physical optimization phase. Historically Spark does not have a formal PhysicalOptimizer, and just uses a method called preparations() (this naming is not good), but it's actually doing quite a few optimization rules: Seq[Rule[SparkPlan]], such as RemoveRedundantSorts, ReplaceHashWithSortAgg, etc. Disclaimer: I added a few rules there so I am pretty sure my understanding is up-to-date.

@clarkzinzow clarkzinzow force-pushed the datasets/feat/operator-fusion-new-optimizer branch from 9afbdca to a6dc8ca Compare January 31, 2023 16:17
@clarkzinzow clarkzinzow requested a review from c21 January 31, 2023 16:17
return self._dag


class PhysicalPlan(Plan):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file path is "logical/interfaces.py" but we are now introducing physical pieces. Shall we name the path as optimizer? It'll be consistent of the 3 components of query processing (planner, optimizer, execution).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine for the renaming if others have no objection. But wanted to make sure we are addressing renaming in a separate PR, for easier review.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a "planner/" directory, why the "Plan" and related are not belong to "planner/"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's fine to put Plan/LogicalPlan/PhysicalPlan here, as Rule depends on Plan, and Optimizer depends on Rule. In the future, we may generalize some graph traversal logic into Plan.

Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @clarkzinzow!

Comment on lines +140 to +141
down_transform_fn = down_op.get_transformation_fn()
up_transform_fn = up_op.get_transformation_fn()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's assert isinstance(down_transform_fn, MapTransformFn) and isinstance(up_transform_fn, MapTransformFn)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python doesn't support isinstance checks with subscripted generics.

@clarkzinzow clarkzinzow force-pushed the datasets/feat/operator-fusion-new-optimizer branch from 708f47b to 39fd63f Compare January 31, 2023 19:54
@clarkzinzow
Copy link
Contributor Author

Failures are unrelated (tensor extension break in master), merging!

@clarkzinzow clarkzinzow merged commit 2137945 into ray-project:master Jan 31, 2023
edoakes pushed a commit to edoakes/ray that referenced this pull request Mar 22, 2023
…on planner. (ray-project#32095)

This PR adds operation fusion to the new execution planner.

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Datasets] Implement stages fusion as optimizer rule
4 participants