Skip to content

Commit

Permalink
refactor(optimizer): move fd dervie into core (risingwavelabs#8540)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored Mar 14, 2023
1 parent 305c864 commit d49a4c5
Show file tree
Hide file tree
Showing 28 changed files with 397 additions and 347 deletions.
55 changes: 36 additions & 19 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use super::super::utils::TableCatalogBuilder;
use super::{stream, GenericPlanNode, GenericPlanRef};
use crate::expr::{Expr, ExprRewriter, InputRef, InputRefDisplay};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::{
ColIndexMapping, ColIndexMappingRewriteExt, Condition, ConditionDisplay, IndexRewriter,
Expand All @@ -46,12 +47,31 @@ pub struct Agg<PlanRef> {
pub input: PlanRef,
}

impl<PlanRef> Agg<PlanRef> {
impl<PlanRef: GenericPlanRef> Agg<PlanRef> {
pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) {
self.agg_calls.iter_mut().for_each(|call| {
call.filter = call.filter.clone().rewrite_expr(r);
});
}

fn output_len(&self) -> usize {
self.group_key.len() + self.agg_calls.len()
}

/// get the Mapping of columnIndex from input column index to output column index,if a input
/// column corresponds more than one out columns, mapping to any one
pub fn o2i_col_mapping(&self) -> ColIndexMapping {
let mut map = vec![None; self.output_len()];
for (i, key) in self.group_key.iter().enumerate() {
map[i] = Some(*key);
}
ColIndexMapping::with_target_size(map, self.input.schema().len())
}

/// get the Mapping of columnIndex from input column index to out column index
pub fn i2o_col_mapping(&self) -> ColIndexMapping {
self.o2i_col_mapping().inverse()
}
}

impl<PlanRef: GenericPlanRef> GenericPlanNode for Agg<PlanRef> {
Expand Down Expand Up @@ -80,6 +100,21 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Agg<PlanRef> {
fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
let output_len = self.output_len();
let _input_len = self.input.schema().len();
let mut fd_set =
FunctionalDependencySet::with_key(output_len, &(0..self.group_key.len()).collect_vec());
// take group keys from input_columns, then grow the target size to column_cnt
let i2o = self.i2o_col_mapping();
for fd in self.input.functional_dependency().as_dependencies() {
if let Some(fd) = i2o.rewrite_functional_dependency(fd) {
fd_set.add_functional_dependency(fd);
}
}
fd_set
}
}

pub enum AggCallState {
Expand Down Expand Up @@ -318,24 +353,6 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
.collect()
}

/// get the Mapping of columnIndex from input column index to output column index,if a input
/// column corresponds more than one out columns, mapping to any one
pub fn o2i_col_mapping(&self) -> ColIndexMapping {
let input_len = self.input.schema().len();
let agg_cal_num = self.agg_calls.len();
let group_key = &self.group_key;
let mut map = vec![None; agg_cal_num + group_key.len()];
for (i, key) in group_key.iter().enumerate() {
map[i] = Some(*key);
}
ColIndexMapping::with_target_size(map, input_len)
}

/// get the Mapping of columnIndex from input column index to out column index
pub fn i2o_col_mapping(&self) -> ColIndexMapping {
self.o2i_col_mapping().inverse()
}

pub fn infer_result_table(
&self,
me: &impl GenericPlanRef,
Expand Down
49 changes: 49 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
use itertools::Itertools;
use risingwave_common::catalog::{Field, FieldDisplay, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;

use super::{GenericPlanNode, GenericPlanRef};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;

/// [`Expand`] expand one row multiple times according to `column_subsets` and also keep
/// original columns of input. It can be used to implement distinct aggregation and group set.
Expand All @@ -35,6 +37,16 @@ pub struct Expand<PlanRef> {
pub input: PlanRef,
}

impl<PlanRef: GenericPlanRef> Expand<PlanRef> {
fn output_len(&self) -> usize {
self.input.schema().len() * 2 + 1
}

fn flag_index(&self) -> usize {
self.output_len() - 1
}
}

impl<PlanRef: GenericPlanRef> GenericPlanNode for Expand<PlanRef> {
fn schema(&self) -> Schema {
let mut fields = self.input.schema().clone().into_fields();
Expand All @@ -59,6 +71,31 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Expand<PlanRef> {
fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
let input_fd = self
.input
.functional_dependency()
.clone()
.into_dependencies();
let output_len = self.output_len();
let flag_index = self.flag_index();

self.input
.functional_dependency()
.as_dependencies()
.iter()
.map(|_input_fd| {})
.collect_vec();

let mut current_fd = FunctionalDependencySet::new(output_len);
for mut fd in input_fd {
fd.grow(output_len);
fd.set_from(flag_index, true);
current_fd.add_functional_dependency(fd);
}
current_fd
}
}

impl<PlanRef: GenericPlanRef> Expand<PlanRef> {
Expand All @@ -73,4 +110,16 @@ impl<PlanRef: GenericPlanRef> Expand<PlanRef> {
})
.collect_vec()
}

pub fn i2o_col_mapping(&self) -> ColIndexMapping {
let input_len = self.input.schema().len();
let map = (0..input_len)
.map(|source| Some(source + input_len))
.collect_vec();
ColIndexMapping::with_target_size(map, self.output_len())
}

pub fn o2i_col_mapping(&self) -> ColIndexMapping {
self.i2o_col_mapping().inverse()
}
}
17 changes: 17 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use risingwave_common::catalog::Schema;
use super::{GenericPlanNode, GenericPlanRef};
use crate::expr::ExprRewriter;
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::utils::Condition;

/// [`Filter`] iterates over its input and returns elements for which `predicate` evaluates to
Expand All @@ -29,6 +30,7 @@ pub struct Filter<PlanRef> {
pub input: PlanRef,
}

impl<PlanRef: GenericPlanRef> Filter<PlanRef> {}
impl<PlanRef: GenericPlanRef> GenericPlanNode for Filter<PlanRef> {
fn schema(&self) -> Schema {
self.input.schema().clone()
Expand All @@ -41,6 +43,21 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Filter<PlanRef> {
fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
let mut functional_dependency = self.input.functional_dependency().clone();
for i in &self.predicate.conjunctions {
if let Some((col, _)) = i.as_eq_const() {
functional_dependency.add_constant_columns(&[col.index()])
} else if let Some((left, right)) = i.as_eq_cond() {
functional_dependency
.add_functional_dependency_by_column_indices(&[left.index()], &[right.index()]);
functional_dependency
.add_functional_dependency_by_column_indices(&[right.index()], &[left.index()]);
}
}
functional_dependency
}
}

impl<PlanRef> Filter<PlanRef> {
Expand Down
34 changes: 24 additions & 10 deletions src/frontend/src/optimizer/plan_node/generic/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use super::super::utils::IndicesDisplay;
use super::{GenericPlanNode, GenericPlanRef};
use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, InputRefDisplay, Literal};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::utils::ColIndexMappingRewriteExt;

/// [`HopWindow`] implements Hop Table Function.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -95,6 +97,24 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for HopWindow<PlanRef> {
fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
let mut fd_set = self
.i2o_col_mapping()
.rewrite_functional_dependency_set(self.input.functional_dependency().clone());
let (start_idx_in_output, end_idx_in_output) = {
let internal2output = self.internal2output_col_mapping();
(
internal2output.try_map(self.internal_window_start_col_idx()),
internal2output.try_map(self.internal_window_end_col_idx()),
)
};
if let Some(start_idx) = start_idx_in_output && let Some(end_idx) = end_idx_in_output {
fd_set.add_functional_dependency_by_column_indices(&[start_idx], &[end_idx]);
fd_set.add_functional_dependency_by_column_indices(&[end_idx], &[start_idx]);
}
fd_set
}
}

impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
Expand All @@ -113,7 +133,7 @@ impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
}

pub fn internal_window_end_col_idx(&self) -> usize {
self.internal_window_start_col_idx() + 1
self.input.schema().len() + 1
}

pub fn o2i_col_mapping(&self) -> ColIndexMapping {
Expand All @@ -127,7 +147,7 @@ impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
}

pub fn internal_column_num(&self) -> usize {
self.internal_window_start_col_idx() + 2
self.input.schema().len() + 2
}

pub fn output2internal_col_mapping(&self) -> ColIndexMapping {
Expand All @@ -139,17 +159,11 @@ impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
}

pub fn input2internal_col_mapping(&self) -> ColIndexMapping {
ColIndexMapping::identity_or_none(
self.internal_window_start_col_idx(),
self.internal_column_num(),
)
ColIndexMapping::identity_or_none(self.input.schema().len(), self.internal_column_num())
}

pub fn internal2input_col_mapping(&self) -> ColIndexMapping {
ColIndexMapping::identity_or_none(
self.internal_column_num(),
self.internal_window_start_col_idx(),
)
ColIndexMapping::identity_or_none(self.internal_column_num(), self.input.schema().len())
}

pub fn derive_window_start_and_end_exprs(&self) -> Result<(Vec<ExprImpl>, Vec<ExprImpl>)> {
Expand Down
59 changes: 58 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use risingwave_pb::plan_common::JoinType;
use super::{EqJoinPredicate, GenericPlanNode, GenericPlanRef};
use crate::expr::ExprRewriter;
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::utils::{ColIndexMapping, Condition};
use crate::optimizer::property::FunctionalDependencySet;
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};

/// [`Join`] combines two relations according to some condition.
///
Expand Down Expand Up @@ -141,6 +142,62 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {
fn ctx(&self) -> OptimizerContextRef {
self.left.ctx()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
let left_len = self.left.schema().len();
let right_len = self.right.schema().len();
let left_fd_set = self.left.functional_dependency().clone();
let right_fd_set = self.right.functional_dependency().clone();

let full_out_col_num = self.internal_column_num();

let get_new_left_fd_set = |left_fd_set: FunctionalDependencySet| {
ColIndexMapping::with_shift_offset(left_len, 0)
.composite(&ColIndexMapping::identity(full_out_col_num))
.rewrite_functional_dependency_set(left_fd_set)
};
let get_new_right_fd_set = |right_fd_set: FunctionalDependencySet| {
ColIndexMapping::with_shift_offset(right_len, left_len.try_into().unwrap())
.rewrite_functional_dependency_set(right_fd_set)
};
let fd_set: FunctionalDependencySet = match self.join_type {
JoinType::Inner => {
let mut fd_set = FunctionalDependencySet::new(full_out_col_num);
for i in &self.on.conjunctions {
if let Some((col, _)) = i.as_eq_const() {
fd_set.add_constant_columns(&[col.index()])
} else if let Some((left, right)) = i.as_eq_cond() {
fd_set.add_functional_dependency_by_column_indices(
&[left.index()],
&[right.index()],
);
fd_set.add_functional_dependency_by_column_indices(
&[right.index()],
&[left.index()],
);
}
}
get_new_left_fd_set(left_fd_set)
.into_dependencies()
.into_iter()
.chain(
get_new_right_fd_set(right_fd_set)
.into_dependencies()
.into_iter(),
)
.for_each(|fd| fd_set.add_functional_dependency(fd));
fd_set
}
JoinType::LeftOuter => get_new_left_fd_set(left_fd_set),
JoinType::RightOuter => get_new_right_fd_set(right_fd_set),
JoinType::FullOuter => FunctionalDependencySet::new(full_out_col_num),
JoinType::LeftSemi | JoinType::LeftAnti => left_fd_set,
JoinType::RightSemi | JoinType::RightAnti => right_fd_set,
JoinType::Unspecified => unreachable!(),
};
ColIndexMapping::with_remaining_columns(&self.output_indices, full_out_col_num)
.rewrite_functional_dependency_set(fd_set)
}
}

impl<PlanRef> Join<PlanRef> {
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use risingwave_common::catalog::Schema;

use super::{stream, EqJoinPredicate};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;

pub mod dynamic_filter;
pub use dynamic_filter::*;
Expand Down Expand Up @@ -47,10 +48,20 @@ pub use share::*;
pub trait GenericPlanRef {
fn schema(&self) -> &Schema;
fn logical_pk(&self) -> &[usize];
fn functional_dependency(&self) -> &FunctionalDependencySet;
fn ctx(&self) -> OptimizerContextRef;
}

pub trait GenericPlanNode {
/// return (schema, `logical_pk`, fds)
fn logical_properties(&self) -> (Schema, Option<Vec<usize>>, FunctionalDependencySet) {
(
self.schema(),
self.logical_pk(),
self.functional_dependency(),
)
}
fn functional_dependency(&self) -> FunctionalDependencySet;
fn schema(&self) -> Schema;
fn logical_pk(&self) -> Option<Vec<usize>>;
fn ctx(&self) -> OptimizerContextRef;
Expand Down
8 changes: 7 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use risingwave_common::util::iter_util::ZipEqFast;
use super::{GenericPlanNode, GenericPlanRef};
use crate::expr::{assert_input_ref, Expr, ExprDisplay, ExprImpl, ExprRewriter, InputRef};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::utils::ColIndexMapping;
use crate::optimizer::property::FunctionalDependencySet;
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt};

fn check_expr_type(expr: &ExprImpl) -> std::result::Result<(), &'static str> {
if expr.has_subquery() {
Expand Down Expand Up @@ -109,6 +110,11 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Project<PlanRef> {
fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
let i2o = self.i2o_col_mapping();
i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone())
}
}

impl<PlanRef: GenericPlanRef> Project<PlanRef> {
Expand Down
Loading

0 comments on commit d49a4c5

Please sign in to comment.