From 729e51fdb30d86e9bad5513bb22d3fbd78a4d5bb Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 3 Jan 2023 15:54:08 +0000 Subject: [PATCH] Enable PhysicalOptimizerRule lazily (#4806) --- datafusion/core/src/execution/context.rs | 101 +++++++++--------- .../physical_optimizer/coalesce_batches.rs | 17 +-- .../src/physical_optimizer/repartition.rs | 3 +- 3 files changed, 59 insertions(+), 62 deletions(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 076d9e0632a8..93981e905cff 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1449,59 +1449,54 @@ impl SessionState { } // We need to take care of the rule ordering. They may influence each other. - let mut physical_optimizers: Vec> = - vec![Arc::new(AggregateStatistics::new())]; - - // - In order to increase the parallelism, it will change the output partitioning - // of some operators in the plan tree, which will influence other rules. - // Therefore, it should be run as soon as possible. - // - The reason to make it optional is - // - it's not used for the distributed engine, Ballista. - // - it's conflicted with some parts of the BasicEnforcement, since it will - // introduce additional repartitioning while the BasicEnforcement aims at - // reducing unnecessary repartitioning. - if config.options.optimizer.enable_round_robin_repartition { - physical_optimizers.push(Arc::new(Repartition::new())); - } - //- Currently it will depend on the partition number to decide whether to change the - // single node sort to parallel local sort and merge. Therefore, it should be run - // after the Repartition. - // - Since it will change the output ordering of some operators, it should be run - // before JoinSelection and BasicEnforcement, which may depend on that. - physical_optimizers.push(Arc::new(GlobalSortSelection::new())); - // Statistics-base join selection will change the Auto mode to real join implementation, - // like collect left, or hash join, or future sort merge join, which will - // influence the BasicEnforcement to decide whether to add additional repartition - // and local sort to meet the distribution and ordering requirements. - // Therefore, it should be run before BasicEnforcement - physical_optimizers.push(Arc::new(JoinSelection::new())); - // If the query is processing infinite inputs, the PipelineFixer rule applies the - // necessary transformations to make the query runnable (if it is not already runnable). - // If the query can not be made runnable, the rule emits an error with a diagnostic message. - // Since the transformations it applies may alter output partitioning properties of operators - // (e.g. by swapping hash join sides), this rule runs before BasicEnforcement. - physical_optimizers.push(Arc::new(PipelineFixer::new())); - // It's for adding essential repartition and local sorting operator to satisfy the - // required distribution and local sort. - // Please make sure that the whole plan tree is determined. - physical_optimizers.push(Arc::new(BasicEnforcement::new())); - // `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements. - // However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary. - // These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The - // rule below performs this analysis and removes unnecessary `SortExec`s. - physical_optimizers.push(Arc::new(OptimizeSorts::new())); - // It will not influence the distribution and ordering of the whole plan tree. - // Therefore, to avoid influencing other rules, it should be run at last. - if config.options.execution.coalesce_batches { - physical_optimizers.push(Arc::new(CoalesceBatches::new( - config.options.execution.batch_size, - ))); - } - // The PipelineChecker rule will reject non-runnable query plans that use - // pipeline-breaking operators on infinite input(s). The rule generates a - // diagnostic error message when this happens. It makes no changes to the - // given query plan; i.e. it only acts as a final gatekeeping rule. - physical_optimizers.push(Arc::new(PipelineChecker::new())); + let physical_optimizers: Vec> = vec![ + Arc::new(AggregateStatistics::new()), + // - In order to increase the parallelism, it will change the output partitioning + // of some operators in the plan tree, which will influence other rules. + // Therefore, it should be run as soon as possible. + // - The reason to make it optional is + // - it's not used for the distributed engine, Ballista. + // - it's conflicted with some parts of the BasicEnforcement, since it will + // introduce additional repartitioning while the BasicEnforcement aims at + // reducing unnecessary repartitioning. + Arc::new(Repartition::new()), + //- Currently it will depend on the partition number to decide whether to change the + // single node sort to parallel local sort and merge. Therefore, it should be run + // after the Repartition. + // - Since it will change the output ordering of some operators, it should be run + // before JoinSelection and BasicEnforcement, which may depend on that. + Arc::new(GlobalSortSelection::new()), + // Statistics-base join selection will change the Auto mode to real join implementation, + // like collect left, or hash join, or future sort merge join, which will + // influence the BasicEnforcement to decide whether to add additional repartition + // and local sort to meet the distribution and ordering requirements. + // Therefore, it should be run before BasicEnforcement + Arc::new(JoinSelection::new()), + // If the query is processing infinite inputs, the PipelineFixer rule applies the + // necessary transformations to make the query runnable (if it is not already runnable). + // If the query can not be made runnable, the rule emits an error with a diagnostic message. + // Since the transformations it applies may alter output partitioning properties of operators + // (e.g. by swapping hash join sides), this rule runs before BasicEnforcement. + Arc::new(PipelineFixer::new()), + // It's for adding essential repartition and local sorting operator to satisfy the + // required distribution and local sort. + // Please make sure that the whole plan tree is determined. + Arc::new(BasicEnforcement::new()), + // `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements. + // However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary. + // These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The + // rule below performs this analysis and removes unnecessary `SortExec`s. + Arc::new(OptimizeSorts::new()), + // It will not influence the distribution and ordering of the whole plan tree. + // Therefore, to avoid influencing other rules, it should be run at last. + Arc::new(CoalesceBatches::new()), + // The PipelineChecker rule will reject non-runnable query plans that use + // pipeline-breaking operators on infinite input(s). The rule generates a + // diagnostic error message when this happens. It makes no changes to the + // given query plan; i.e. it only acts as a final gatekeeping rule. + Arc::new(PipelineChecker::new()), + ]; + SessionState { session_id, optimizer: Optimizer::new(), diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index 40de861ecc38..a1566c37cd2d 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -32,24 +32,25 @@ use std::sync::Arc; /// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that /// are produced by highly selective filters #[derive(Default)] -pub struct CoalesceBatches { - /// Target batch size - target_batch_size: usize, -} +pub struct CoalesceBatches {} impl CoalesceBatches { #[allow(missing_docs)] - pub fn new(target_batch_size: usize) -> Self { - Self { target_batch_size } + pub fn new() -> Self { + Self::default() } } impl PhysicalOptimizerRule for CoalesceBatches { fn optimize( &self, plan: Arc, - _config: &ConfigOptions, + config: &ConfigOptions, ) -> Result> { - let target_batch_size = self.target_batch_size; + if !config.execution.coalesce_batches { + return Ok(plan); + } + + let target_batch_size = config.execution.batch_size; plan.transform_up(&|plan| { let plan_any = plan.as_any(); // The goal here is to detect operators that could produce small batches and only diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 9320de41e032..2044b2aaac10 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -211,8 +211,9 @@ impl PhysicalOptimizerRule for Repartition { config: &ConfigOptions, ) -> Result> { let target_partitions = config.execution.target_partitions; + let enabled = config.optimizer.enable_round_robin_repartition; // Don't run optimizer if target_partitions == 1 - if target_partitions == 1 { + if !enabled || target_partitions == 1 { Ok(plan) } else { optimize_partitions(