Skip to content

Commit

Permalink
Merge ec77e2d into eed77a2
Browse files Browse the repository at this point in the history
  • Loading branch information
avantgardnerio authored Jul 12, 2022
2 parents eed77a2 + ec77e2d commit 9442f07
Show file tree
Hide file tree
Showing 15 changed files with 42 additions and 30 deletions.
5 changes: 3 additions & 2 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1381,7 +1381,7 @@ impl SessionState {
// optimize the child plan, capturing the output of each optimizer
let plan = self.optimizer.optimize(
e.plan.as_ref(),
&optimizer_config,
&mut optimizer_config,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
Expand All @@ -1396,7 +1396,8 @@ impl SessionState {
schema: e.schema.clone(),
}))
} else {
self.optimizer.optimize(plan, &optimizer_config, |_, _| {})
self.optimizer
.optimize(plan, &mut optimizer_config, |_, _| {})
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl OptimizerRule for TopKOptimizerRule {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
// Note: this code simply looks for the pattern of a Limit followed by a
// Sort and replaces it by a TopK node. It does not handle many
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl OptimizerRule for CommonSubexprEliminate {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
optimize(plan, optimizer_config)
}
Expand Down Expand Up @@ -708,7 +708,7 @@ mod test {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let optimizer = CommonSubexprEliminate {};
let optimized_plan = optimizer
.optimize(plan, &OptimizerConfig::new())
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/eliminate_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl OptimizerRule for EliminateFilter {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Filter(Filter {
Expand Down Expand Up @@ -84,7 +84,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = EliminateFilter::new();
let optimized_plan = rule
.optimize(plan, &OptimizerConfig::new())
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl OptimizerRule for EliminateLimit {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
eliminate_limit(self, &Ancestor::NotRelevant, plan, optimizer_config)
}
Expand All @@ -152,7 +152,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = EliminateLimit::new();
let optimized_plan = rule
.optimize(plan, &OptimizerConfig::new())
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/filter_null_join_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl OptimizerRule for FilterNullJoinKeys {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
match plan {
LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {
Expand Down Expand Up @@ -145,7 +145,7 @@ mod tests {

fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
let rule = FilterNullJoinKeys::default();
rule.optimize(plan, &OptimizerConfig::new())
rule.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan")
}

Expand Down
8 changes: 6 additions & 2 deletions datafusion/optimizer/src/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,11 @@ impl OptimizerRule for FilterPushDown {
"filter_push_down"
}

fn optimize(&self, plan: &LogicalPlan, _: &OptimizerConfig) -> Result<LogicalPlan> {
fn optimize(
&self,
plan: &LogicalPlan,
_: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
optimize(plan, State::default())
}
}
Expand Down Expand Up @@ -666,7 +670,7 @@ mod tests {

fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
let rule = FilterPushDown::new();
rule.optimize(plan, &OptimizerConfig::new())
rule.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan")
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/limit_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl OptimizerRule for LimitPushDown {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
limit_push_down(self, Ancestor::NotRelevant, plan, optimizer_config)
}
Expand All @@ -358,7 +358,7 @@ mod test {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = LimitPushDown::new();
let optimized_plan = rule
.optimize(plan, &OptimizerConfig::new())
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
11 changes: 9 additions & 2 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub trait OptimizerRule {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan>;

/// A human readable name for this optimizer rule
Expand All @@ -44,15 +44,22 @@ pub struct OptimizerConfig {
/// Query execution start time that can be used to rewrite expressions such as `now()`
/// to use a literal value instead
pub query_execution_start_time: DateTime<Utc>,
next_id: usize,
}

impl OptimizerConfig {
/// Create optimizer config
pub fn new() -> Self {
Self {
query_execution_start_time: chrono::Utc::now(),
next_id: 0, // useful for generating things like unique subquery aliases
}
}

pub fn next_id(&mut self) -> usize {
self.next_id += 1;
self.next_id
}
}

impl Default for OptimizerConfig {
Expand Down Expand Up @@ -80,7 +87,7 @@ impl Optimizer {
pub fn optimize<F>(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
mut observer: F,
) -> Result<LogicalPlan>
where
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl OptimizerRule for ProjectionPushDown {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
// set of all columns refered by the plan (and thus considered required by the root)
let required_columns = plan
Expand Down Expand Up @@ -1011,6 +1011,6 @@ mod tests {

fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
let rule = ProjectionPushDown::new();
rule.optimize(plan, &OptimizerConfig::new())
rule.optimize(plan, &mut OptimizerConfig::new())
}
}
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/reduce_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl OptimizerRule for ReduceOuterJoin {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
let mut nonnullable_cols: Vec<Column> = vec![];

Expand Down Expand Up @@ -367,7 +367,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = ReduceOuterJoin::new();
let optimized_plan = rule
.optimize(plan, &OptimizerConfig::new())
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
8 changes: 4 additions & 4 deletions datafusion/optimizer/src/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl OptimizerRule for SimplifyExpressions {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
let mut execution_props = ExecutionProps::new();
execution_props.query_execution_start_time =
Expand Down Expand Up @@ -1545,7 +1545,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = SimplifyExpressions::new();
let optimized_plan = rule
.optimize(plan, &OptimizerConfig::new())
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down Expand Up @@ -1768,7 +1768,7 @@ mod tests {
let rule = SimplifyExpressions::new();

let err = rule
.optimize(plan, &config)
.optimize(plan, &mut config)
.expect_err("expected optimization to fail");

err.to_string()
Expand All @@ -1783,7 +1783,7 @@ mod tests {
let rule = SimplifyExpressions::new();

let optimized_plan = rule
.optimize(plan, &config)
.optimize(plan, &mut config)
.expect("failed to optimize plan");
return format!("{:?}", optimized_plan);
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/single_distinct_to_groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &OptimizerConfig,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
optimize(plan)
}
Expand All @@ -221,7 +221,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = SingleDistinctToGroupBy::new();
let optimized_plan = rule
.optimize(plan, &OptimizerConfig::new())
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");

let formatted_plan = format!("{}", optimized_plan.display_indent_schema());
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/subquery_filter_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl OptimizerRule for SubqueryFilterToJoin {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Filter(Filter { predicate, input }) => {
Expand Down Expand Up @@ -207,7 +207,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = SubqueryFilterToJoin::new();
let optimized_plan = rule
.optimize(plan, &OptimizerConfig::new())
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{}", optimized_plan.display_indent_schema());
assert_eq!(formatted_plan, expected);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::sync::Arc;
pub fn optimize_children(
optimizer: &impl OptimizerRule,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
let new_exprs = plan.expressions();
let new_inputs = plan
Expand Down

0 comments on commit 9442f07

Please sign in to comment.