Skip to content

Commit ff9938b

Browse files
committed
refine optimizer
1 parent ddc5067 commit ff9938b

35 files changed

+1024
-78
lines changed

.licenserc.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,6 @@ header:
3434
- "**/*.toml"
3535
- "**/*.lock"
3636
- "**/*.yapf"
37+
- "**/*.test"
3738

3839
comment: on-failure

Cargo.lock

+3-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

query/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ walkdir = "2.3.2"
127127
[dev-dependencies]
128128
clickhouse-driver = { git = "https://github.com/datafuse-extras/clickhouse_driver", rev = "cf978da" }
129129
criterion = "0.3.5"
130+
goldenfile = "1.3.0"
130131
jwt-simple = "0.10.9"
131132
maplit = "1.0.2"
132133
mysql_async = "0.29.0"

query/src/servers/http/v1/query/execute_state.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ impl ExecuteState {
194194
executor: executor.clone(),
195195
block_buffer,
196196
});
197-
interpreter.execute(None).await.unwrap();
197+
interpreter.execute(None).await?;
198198

199199
Ok(executor)
200200
} else {

query/src/sql/exec/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ use crate::sql::plans::AggregatePlan;
8282
use crate::sql::plans::AndExpr;
8383
use crate::sql::plans::CrossApply;
8484
use crate::sql::plans::EvalScalar;
85-
use crate::sql::plans::FilterPlan;
85+
use crate::sql::plans::Filter;
8686
use crate::sql::plans::LimitPlan;
8787
use crate::sql::plans::PhysicalHashJoin;
8888
use crate::sql::plans::PhysicalScan;
@@ -315,7 +315,7 @@ impl PipelineBuilder {
315315
fn build_filter(
316316
&mut self,
317317
ctx: Arc<QueryContext>,
318-
filter: &FilterPlan,
318+
filter: &Filter,
319319
input_schema: DataSchemaRef,
320320
pipeline: &mut NewPipeline,
321321
) -> Result<DataSchemaRef> {

query/src/sql/exec/util.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub fn check_physical(expression: &SExpr) -> bool {
2828
}
2929

3030
for child in expression.children() {
31-
if !child.plan().is_physical() {
31+
if !check_physical(child) {
3232
return false;
3333
}
3434
}

query/src/sql/optimizer/heuristic/mod.rs

+52-19
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,27 @@ mod implement;
1616
mod rule_list;
1717

1818
use common_exception::Result;
19+
use lazy_static::lazy_static;
1920

21+
use super::rule::RuleID;
2022
use crate::sql::optimizer::heuristic::implement::HeuristicImplementor;
21-
use crate::sql::optimizer::heuristic::rule_list::RuleList;
23+
pub use crate::sql::optimizer::heuristic::rule_list::RuleList;
2224
use crate::sql::optimizer::rule::TransformState;
2325
use crate::sql::optimizer::SExpr;
2426

27+
lazy_static! {
28+
pub static ref DEFAULT_REWRITE_RULES: Vec<RuleID> = vec![
29+
RuleID::EliminateFilter,
30+
RuleID::EliminateEvalScalar,
31+
RuleID::EliminateProject,
32+
RuleID::MergeFilter,
33+
RuleID::MergeEvalScalar,
34+
RuleID::MergeProject,
35+
RuleID::PushDownFilterEvalScalar,
36+
RuleID::PushDownFilterProject,
37+
];
38+
}
39+
2540
/// A heuristic query optimizer. It will apply specific transformation rules in order and
2641
/// implement the logical plans with default implementation rules.
2742
pub struct HeuristicOptimizer {
@@ -30,15 +45,16 @@ pub struct HeuristicOptimizer {
3045
}
3146

3247
impl HeuristicOptimizer {
33-
pub fn create() -> Result<Self> {
34-
Ok(HeuristicOptimizer {
35-
rules: RuleList::create(vec![])?,
48+
pub fn new(rules: RuleList) -> Self {
49+
HeuristicOptimizer {
50+
rules,
3651
implementor: HeuristicImplementor::new(),
37-
})
52+
}
3853
}
3954

4055
pub fn optimize(&mut self, expression: SExpr) -> Result<SExpr> {
41-
let result = self.optimize_expression(&expression)?;
56+
let optimized = self.optimize_expression(&expression)?;
57+
let result = self.implement_expression(&optimized)?;
4258
Ok(result)
4359
}
4460

@@ -53,24 +69,41 @@ impl HeuristicOptimizer {
5369
Ok(result)
5470
}
5571

56-
fn apply_transform_rules(&self, s_expr: &SExpr, rule_list: &RuleList) -> Result<SExpr> {
57-
let mut result = s_expr.clone();
72+
fn implement_expression(&self, s_expr: &SExpr) -> Result<SExpr> {
73+
let mut implemented_children = Vec::with_capacity(s_expr.arity());
74+
for expr in s_expr.children() {
75+
implemented_children.push(self.implement_expression(expr)?);
76+
}
77+
let implemented_expr = SExpr::create(s_expr.plan().clone(), implemented_children, None);
78+
// Implement expression with Implementor
79+
let mut state = TransformState::new();
80+
self.implementor.implement(&implemented_expr, &mut state)?;
81+
let result = if !state.results().is_empty() {
82+
state.results()[0].clone()
83+
} else {
84+
implemented_expr
85+
};
86+
Ok(result)
87+
}
5888

89+
// Return `None` if no rules matched
90+
fn apply_transform_rules(&self, s_expr: &SExpr, rule_list: &RuleList) -> Result<SExpr> {
91+
let mut s_expr = s_expr.clone();
5992
for rule in rule_list.iter() {
6093
let mut state = TransformState::new();
61-
rule.apply(&result, &mut state)?;
62-
if !state.results().is_empty() {
63-
result = state.results()[0].clone();
64-
}
65-
}
94+
if s_expr.match_pattern(rule.pattern()) && !s_expr.applied_rule(&rule.id()) {
95+
rule.apply(&s_expr, &mut state)?;
96+
s_expr.apply_rule(&rule.id());
97+
if !state.results().is_empty() {
98+
// Recursive optimize the result
99+
let result = &state.results()[0];
100+
let optimized_result = self.optimize_expression(result)?;
66101

67-
// Implement expression with Implementor
68-
let mut state = TransformState::new();
69-
self.implementor.implement(s_expr, &mut state)?;
70-
if !state.results().is_empty() {
71-
result = state.results()[0].clone();
102+
return Ok(optimized_result);
103+
}
104+
}
72105
}
73106

74-
Ok(result)
107+
Ok(s_expr.clone())
75108
}
76109
}

query/src/sql/optimizer/mod.rs

+9-4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ mod s_expr;
2525

2626
use common_exception::Result;
2727
pub use heuristic::HeuristicOptimizer;
28+
pub use heuristic::DEFAULT_REWRITE_RULES;
2829
pub use m_expr::MExpr;
2930
pub use memo::Memo;
3031
pub use optimize_context::OptimizeContext;
@@ -34,11 +35,14 @@ pub use property::PhysicalProperty;
3435
pub use property::RelExpr;
3536
pub use property::RelationalProperty;
3637
pub use property::RequiredProperty;
38+
pub use rule::RuleFactory;
3739
pub use s_expr::SExpr;
3840

3941
use super::plans::Plan;
40-
use crate::sql::optimizer::rule::RuleID;
42+
pub use crate::sql::optimizer::heuristic::RuleList;
43+
pub use crate::sql::optimizer::rule::RuleID;
4144
use crate::sql::optimizer::rule::RuleSet;
45+
use crate::sql::MetadataRef;
4246

4347
pub fn optimize(plan: Plan) -> Result<Plan> {
4448
match plan {
@@ -47,7 +51,7 @@ pub fn optimize(plan: Plan) -> Result<Plan> {
4751
bind_context,
4852
metadata,
4953
} => Ok(Plan::Query {
50-
s_expr: optimize_query(s_expr)?,
54+
s_expr: optimize_query(s_expr, metadata.clone())?,
5155
bind_context,
5256
metadata,
5357
}),
@@ -74,8 +78,9 @@ pub fn optimize(plan: Plan) -> Result<Plan> {
7478
}
7579
}
7680

77-
pub fn optimize_query(expression: SExpr) -> Result<SExpr> {
78-
let mut heuristic = HeuristicOptimizer::create()?;
81+
pub fn optimize_query(expression: SExpr, _metadata: MetadataRef) -> Result<SExpr> {
82+
let rules = RuleList::create(DEFAULT_REWRITE_RULES.clone())?;
83+
let mut heuristic = HeuristicOptimizer::new(rules);
7984
let s_expr = heuristic.optimize(expression)?;
8085
// TODO: enable cascades optimizer
8186
// let mut cascades = CascadesOptimizer::create(ctx);

query/src/sql/optimizer/rule/factory.rs

+18-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@
1414

1515
use common_exception::Result;
1616

17+
use super::rewrite::RuleEliminateEvalScalar;
18+
use super::rewrite::RulePushDownFilterEvalScalar;
19+
use super::rewrite::RulePushDownFilterProject;
20+
use crate::sql::optimizer::rule::rewrite::RuleEliminateFilter;
21+
use crate::sql::optimizer::rule::rewrite::RuleEliminateProject;
22+
use crate::sql::optimizer::rule::rewrite::RuleMergeEvalScalar;
23+
use crate::sql::optimizer::rule::rewrite::RuleMergeFilter;
24+
use crate::sql::optimizer::rule::rewrite::RuleMergeProject;
1725
use crate::sql::optimizer::rule::rule_implement_get::RuleImplementGet;
1826
use crate::sql::optimizer::rule::rule_implement_hash_join::RuleImplementHashJoin;
1927
use crate::sql::optimizer::rule::RuleID;
@@ -28,8 +36,16 @@ impl RuleFactory {
2836

2937
pub fn create_rule(&self, id: RuleID) -> Result<RulePtr> {
3038
match id {
31-
RuleID::ImplementGet => Ok(Box::new(RuleImplementGet::create())),
32-
RuleID::ImplementHashJoin => Ok(Box::new(RuleImplementHashJoin::create())),
39+
RuleID::ImplementGet => Ok(Box::new(RuleImplementGet::new())),
40+
RuleID::ImplementHashJoin => Ok(Box::new(RuleImplementHashJoin::new())),
41+
RuleID::EliminateEvalScalar => Ok(Box::new(RuleEliminateEvalScalar::new())),
42+
RuleID::PushDownFilterProject => Ok(Box::new(RulePushDownFilterProject::new())),
43+
RuleID::PushDownFilterEvalScalar => Ok(Box::new(RulePushDownFilterEvalScalar::new())),
44+
RuleID::EliminateFilter => Ok(Box::new(RuleEliminateFilter::new())),
45+
RuleID::EliminateProject => Ok(Box::new(RuleEliminateProject::new())),
46+
RuleID::MergeProject => Ok(Box::new(RuleMergeProject::new())),
47+
RuleID::MergeEvalScalar => Ok(Box::new(RuleMergeEvalScalar::new())),
48+
RuleID::MergeFilter => Ok(Box::new(RuleMergeFilter::new())),
3349
}
3450
}
3551
}

query/src/sql/optimizer/rule/mod.rs

+28-16
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,22 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::fmt::Display;
16+
use std::fmt::Formatter;
17+
1518
use common_exception::Result;
1619

1720
use crate::sql::optimizer::SExpr;
1821

1922
mod factory;
23+
mod rewrite;
2024
mod rule_implement_get;
2125
mod rule_implement_hash_join;
2226
mod rule_set;
2327
mod transform_state;
2428

2529
pub use factory::RuleFactory;
30+
pub use rule_set::AppliedRules;
2631
pub use rule_set::RuleSet;
2732
pub use transform_state::TransformState;
2833

@@ -31,32 +36,39 @@ pub type RulePtr = Box<dyn Rule>;
3136
pub trait Rule {
3237
fn id(&self) -> RuleID;
3338

34-
fn apply(&self, expression: &SExpr, state: &mut TransformState) -> Result<()>;
39+
fn apply(&self, s_expr: &SExpr, state: &mut TransformState) -> Result<()>;
3540

3641
fn pattern(&self) -> &SExpr;
3742
}
3843

39-
#[derive(Copy, Clone, Eq, PartialEq, Hash)]
44+
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
4045
pub enum RuleID {
46+
PushDownFilterProject,
47+
PushDownFilterEvalScalar,
48+
EliminateEvalScalar,
49+
EliminateFilter,
50+
EliminateProject,
51+
MergeProject,
52+
MergeEvalScalar,
53+
MergeFilter,
54+
4155
ImplementGet,
4256
ImplementHashJoin,
4357
}
4458

45-
impl RuleID {
46-
pub fn name(&self) -> &'static str {
47-
match self {
48-
RuleID::ImplementGet => "ImplementGet",
49-
RuleID::ImplementHashJoin => "ImplementHashJoin",
50-
}
51-
}
52-
53-
/// Unique integral id
54-
/// TODO: maybe use a macro like https://docs.rs/iota/0.2.2/iota/ to implement this?
55-
#[allow(dead_code)]
56-
pub fn uid(&self) -> u32 {
59+
impl Display for RuleID {
60+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
5761
match self {
58-
RuleID::ImplementGet => 0,
59-
RuleID::ImplementHashJoin => 1,
62+
RuleID::ImplementGet => write!(f, "ImplementGet"),
63+
RuleID::ImplementHashJoin => write!(f, "ImplementHashJoin"),
64+
RuleID::PushDownFilterProject => write!(f, "PushDownFilterProject"),
65+
RuleID::PushDownFilterEvalScalar => write!(f, "PushDownFilterEvalScalar"),
66+
RuleID::EliminateEvalScalar => write!(f, "EliminateEvalScalar"),
67+
RuleID::EliminateFilter => write!(f, "EliminateFilter"),
68+
RuleID::EliminateProject => write!(f, "EliminateProject"),
69+
RuleID::MergeProject => write!(f, "MergeProject"),
70+
RuleID::MergeEvalScalar => write!(f, "MergeEvalScalar"),
71+
RuleID::MergeFilter => write!(f, "MergeFilter"),
6072
}
6173
}
6274
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod rule_eliminate_eval_scalar;
16+
mod rule_eliminate_filter;
17+
mod rule_eliminate_project;
18+
mod rule_merge_eval_scalar;
19+
mod rule_merge_filter;
20+
mod rule_merge_project;
21+
mod rule_push_down_filter_eval_scalar;
22+
mod rule_push_down_filter_project;
23+
24+
pub use rule_eliminate_eval_scalar::RuleEliminateEvalScalar;
25+
pub use rule_eliminate_filter::RuleEliminateFilter;
26+
pub use rule_eliminate_project::RuleEliminateProject;
27+
pub use rule_merge_eval_scalar::RuleMergeEvalScalar;
28+
pub use rule_merge_filter::RuleMergeFilter;
29+
pub use rule_merge_project::RuleMergeProject;
30+
pub use rule_push_down_filter_eval_scalar::RulePushDownFilterEvalScalar;
31+
pub use rule_push_down_filter_project::RulePushDownFilterProject;

0 commit comments

Comments
 (0)