Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: filter null value before join #16722

Merged
merged 8 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 21 additions & 22 deletions src/query/sql/src/planner/optimizer/cascades/cascade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::sync::Arc;

use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use log::debug;
Expand All @@ -30,56 +29,56 @@ use crate::optimizer::format::display_memo;
use crate::optimizer::memo::Memo;
use crate::optimizer::rule::TransformResult;
use crate::optimizer::Distribution;
use crate::optimizer::OptimizerContext;
use crate::optimizer::RequiredProperty;
use crate::optimizer::RuleSet;
use crate::optimizer::SExpr;
use crate::IndexType;
use crate::MetadataRef;

/// A cascades-style search engine to enumerate possible alternations of a relational expression and
/// find the optimal one.
pub struct CascadesOptimizer {
pub(crate) ctx: Arc<dyn TableContext>,
pub(crate) opt_ctx: OptimizerContext,
pub(crate) memo: Memo,
pub(crate) cost_model: Box<dyn CostModel>,
pub(crate) explore_rule_set: RuleSet,
pub(crate) metadata: MetadataRef,
pub(crate) enforce_distribution: bool,
}

impl CascadesOptimizer {
pub fn new(
ctx: Arc<dyn TableContext>,
metadata: MetadataRef,
mut optimized: bool,
enforce_distribution: bool,
) -> Result<Self> {
let explore_rule_set = if ctx.get_settings().get_enable_cbo()? {
if unsafe { ctx.get_settings().get_disable_join_reorder()? } {
pub fn new(opt_ctx: OptimizerContext, mut optimized: bool) -> Result<Self> {
let explore_rule_set = if opt_ctx.table_ctx.get_settings().get_enable_cbo()? {
if unsafe {
opt_ctx
.table_ctx
.get_settings()
.get_disable_join_reorder()?
} {
optimized = true;
}
get_explore_rule_set(optimized)
} else {
RuleSet::create()
};

let cluster_peers = ctx.get_cluster().nodes.len();
let dop = ctx.get_settings().get_max_threads()? as usize;
let cluster_peers = opt_ctx.table_ctx.get_cluster().nodes.len();
let dop = opt_ctx.table_ctx.get_settings().get_max_threads()? as usize;
let cost_model = Box::new(
DefaultCostModel::new(ctx.clone())?
DefaultCostModel::new(opt_ctx.table_ctx.clone())?
.with_cluster_peers(cluster_peers)
.with_degree_of_parallelism(dop),
);
Ok(CascadesOptimizer {
ctx,
opt_ctx,
memo: Memo::create(),
cost_model,
explore_rule_set,
metadata,
enforce_distribution,
})
}

pub(crate) fn enforce_distribution(&self) -> bool {
self.opt_ctx.enable_distributed_optimization
}

fn init(&mut self, expression: SExpr) -> Result<()> {
self.memo.init(expression)?;

Expand All @@ -97,7 +96,7 @@ impl CascadesOptimizer {
.ok_or_else(|| ErrorCode::Internal("Root group cannot be None after initialization"))?
.group_index;

let root_required_prop = if self.enforce_distribution {
let root_required_prop = if self.enforce_distribution() {
RequiredProperty {
distribution: Distribution::Serial,
}
Expand All @@ -106,13 +105,13 @@ impl CascadesOptimizer {
};

let root_task = OptimizeGroupTask::new(
self.ctx.clone(),
self.opt_ctx.table_ctx.clone(),
None,
root_index,
root_required_prop.clone(),
);

let task_limit = if self.ctx.get_settings().get_enable_cbo()? {
let task_limit = if self.opt_ctx.table_ctx.get_settings().get_enable_cbo()? {
DEFAULT_TASK_LIMIT
} else {
0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl ApplyRuleTask {
let group = optimizer.memo.group(self.target_group_index)?;
let m_expr = group.m_expr(self.m_expr_index)?;
let mut state = TransformResult::new();
let rule = RuleFactory::create_rule(self.rule_id, optimizer.metadata.clone())?;
let rule = RuleFactory::create_rule(self.rule_id, optimizer.opt_ctx.clone())?;
m_expr.apply_rule(&optimizer.memo, &rule, &mut state)?;
optimizer.insert_from_transform_state(self.target_group_index, state)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl OptimizeExprTask {
let should_enforce = {
let mut should_enforce = true;

if optimizer.enforce_distribution
if optimizer.enforce_distribution()
&& physical_prop.distribution == Distribution::Serial
&& !matches!(
self.required_prop.distribution,
Expand All @@ -367,7 +367,7 @@ impl OptimizeExprTask {
should_enforce = false;
}

if optimizer.enforce_distribution
if optimizer.enforce_distribution()
&& children_best_props
.iter()
.any(|prop| prop.distribution == Distribution::Serial)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl OptimizeGroupTask {
}

let rel_expr = RelExpr::with_m_expr(m_expr, &optimizer.memo);
let children_required_props = if optimizer.enforce_distribution {
let children_required_props = if optimizer.enforce_distribution() {
rel_expr.compute_required_prop_children(self.ctx.clone(), &self.required_prop)?
} else {
vec![vec![RequiredProperty::default(); m_expr.plan.arity()]]
Expand Down
71 changes: 37 additions & 34 deletions src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::optimizer::hyper_dp::query_graph::QueryGraph;
use crate::optimizer::hyper_dp::util::intersect;
use crate::optimizer::hyper_dp::util::union;
use crate::optimizer::rule::TransformResult;
use crate::optimizer::OptimizerContext;
use crate::optimizer::RuleFactory;
use crate::optimizer::RuleID;
use crate::optimizer::SExpr;
Expand All @@ -45,9 +46,7 @@ const RELATION_THRESHOLD: usize = 10;
// The join reorder algorithm follows the paper: Dynamic Programming Strikes Back
// See the paper for more details.
pub struct DPhpy {
ctx: Arc<dyn TableContext>,
sample_executor: Option<Arc<dyn QueryExecutor>>,
metadata: MetadataRef,
opt_ctx: OptimizerContext,
join_relations: Vec<JoinRelation>,
// base table index -> index of join_relations
table_index_map: HashMap<IndexType, IndexType>,
Expand All @@ -61,15 +60,9 @@ pub struct DPhpy {
}

impl DPhpy {
pub fn new(
ctx: Arc<dyn TableContext>,
metadata: MetadataRef,
sample_executor: Option<Arc<dyn QueryExecutor>>,
) -> Self {
pub fn new(opt_ctx: OptimizerContext) -> Self {
Self {
ctx,
sample_executor,
metadata,
opt_ctx,
join_relations: vec![],
table_index_map: Default::default(),
dp_table: Default::default(),
Expand All @@ -80,22 +73,30 @@ impl DPhpy {
}
}

fn table_ctx(&self) -> Arc<dyn TableContext> {
self.opt_ctx.table_ctx.clone()
}

fn metadata(&self) -> MetadataRef {
self.opt_ctx.metadata.clone()
}

fn sample_executor(&self) -> Option<Arc<dyn QueryExecutor>> {
self.opt_ctx.sample_executor.clone()
}

async fn new_children(&mut self, s_expr: &SExpr) -> Result<SExpr> {
// Parallel process children: start a new dphyp for each child.
let ctx = self.ctx.clone();
let metadata = self.metadata.clone();
let sample_executor = self.sample_executor.clone();
let left_expr = s_expr.children[0].clone();
let opt_ctx = self.opt_ctx.clone();
let left_res = spawn(async move {
let mut dphyp = DPhpy::new(ctx, metadata, sample_executor);
let mut dphyp = DPhpy::new(opt_ctx.clone());
(dphyp.optimize(&left_expr).await, dphyp.table_index_map)
});
let ctx = self.ctx.clone();
let metadata = self.metadata.clone();
let sample_executor = self.sample_executor.clone();
let right_expr = s_expr.children[1].clone();
let opt_ctx = self.opt_ctx.clone();
let right_res = spawn(async move {
let mut dphyp = DPhpy::new(ctx, metadata, sample_executor);
let mut dphyp = DPhpy::new(opt_ctx.clone());
(dphyp.optimize(&right_expr).await, dphyp.table_index_map)
});
let left_res = left_res
Expand Down Expand Up @@ -130,19 +131,17 @@ impl DPhpy {
) -> Result<(Arc<SExpr>, bool)> {
if is_subquery {
// If it's a subquery, start a new dphyp
let mut dphyp = DPhpy::new(
self.ctx.clone(),
self.metadata.clone(),
self.sample_executor.clone(),
);
let mut dphyp = DPhpy::new(self.opt_ctx.clone());
let (new_s_expr, _) = dphyp.optimize(s_expr).await?;
// Merge `table_index_map` of subquery into current `table_index_map`.
let relation_idx = self.join_relations.len() as IndexType;
for table_index in dphyp.table_index_map.keys() {
self.table_index_map.insert(*table_index, relation_idx);
}
self.join_relations
.push(JoinRelation::new(&new_s_expr, self.sample_executor.clone()));
self.join_relations.push(JoinRelation::new(
&new_s_expr,
self.sample_executor().clone(),
));
return Ok((new_s_expr, true));
}

Expand All @@ -152,9 +151,9 @@ impl DPhpy {
// Check if relation contains filter, if exists, check if the filter in `filters`
// If exists, remove it from `filters`
self.check_filter(relation);
JoinRelation::new(relation, self.sample_executor.clone())
JoinRelation::new(relation, self.sample_executor().clone())
} else {
JoinRelation::new(s_expr, self.sample_executor.clone())
JoinRelation::new(s_expr, self.sample_executor().clone())
};
self.table_index_map
.insert(op.table_index, self.join_relations.len() as IndexType);
Expand Down Expand Up @@ -217,8 +216,10 @@ impl DPhpy {
}
if !is_inner_join {
let new_s_expr = self.new_children(s_expr).await?;
self.join_relations
.push(JoinRelation::new(&new_s_expr, self.sample_executor.clone()));
self.join_relations.push(JoinRelation::new(
&new_s_expr,
self.sample_executor().clone(),
));
Ok((Arc::new(new_s_expr), true))
} else {
let left_res = self
Expand Down Expand Up @@ -278,8 +279,10 @@ impl DPhpy {
}
RelOperator::UnionAll(_) => {
let new_s_expr = self.new_children(s_expr).await?;
self.join_relations
.push(JoinRelation::new(&new_s_expr, self.sample_executor.clone()));
self.join_relations.push(JoinRelation::new(
&new_s_expr,
self.sample_executor().clone(),
));
Ok((Arc::new(new_s_expr), true))
}
RelOperator::Exchange(_) => {
Expand Down Expand Up @@ -383,7 +386,7 @@ impl DPhpy {
// Get nodes in `relation_set_tree`
let nodes = self.relation_set_tree.get_relation_set_by_index(idx)?;
let ce = relation
.cardinality(self.ctx.clone(), self.metadata.clone())
.cardinality(self.table_ctx().clone(), self.metadata().clone())
.await?;
let join = JoinNode {
join_type: JoinType::Inner,
Expand Down Expand Up @@ -822,7 +825,7 @@ impl DPhpy {

fn apply_rule(&self, s_expr: &SExpr) -> Result<SExpr> {
let mut s_expr = s_expr.clone();
let rule = RuleFactory::create_rule(RuleID::PushDownFilterJoin, self.metadata.clone())?;
let rule = RuleFactory::create_rule(RuleID::PushDownFilterJoin, self.opt_ctx.clone())?;
let mut state = TransformResult::new();
if rule
.matchers()
Expand Down
Loading
Loading