Skip to content

Commit

Permalink
Merge pull request #9633 from xudong963/update_stats
Browse files Browse the repository at this point in the history
feat: update statistic during query execution
  • Loading branch information
xudong963 authored Jan 17, 2023
2 parents 693a2f0 + 6047883 commit e80d713
Show file tree
Hide file tree
Showing 17 changed files with 292 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ impl HashJoinState for JoinHashTable {
}

if input_block.is_empty() {
return Ok(vec![]);
let null_block = self.null_blocks_for_right_join(&unmatched_build_indexes)?;
return Ok(vec![null_block]);
}

let (bm, all_true, all_false) = self.get_other_filters(
Expand Down
7 changes: 7 additions & 0 deletions src/query/sql/src/planner/optimizer/property/column_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ pub struct ColumnStat {
/// Histogram of column
pub histogram: Option<Histogram>,
}

#[derive(Debug, Clone)]
pub struct NewStatistic {
pub min: Option<Datum>,
pub max: Option<Datum>,
pub ndv: Option<f64>,
}
25 changes: 25 additions & 0 deletions src/query/sql/src/planner/optimizer/property/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,31 @@ impl UniformSampleSet {
(Ordering::Greater, _) | (_, Ordering::Less)
))
}

pub fn intersection(&self, other: &UniformSampleSet) -> Result<(Option<Datum>, Option<Datum>)> {
match (&self.min, &other.min) {
(Datum::Bytes(_), Datum::Bytes(_)) | (Datum::Bool(_), Datum::Bool(_)) => {
Ok((None, None))
}
_ => {
let left_min = self.min.to_double()?;
let left_max = self.max.to_double()?;
let right_min = other.min.to_double()?;
let right_max = other.max.to_double()?;
let new_min = if left_min <= right_min {
other.min.clone()
} else {
self.min.clone()
};
let new_max = if left_max >= right_max {
other.max.clone()
} else {
self.max.clone()
};
Ok((Some(new_min), Some(new_max)))
}
}
}
}

impl SampleSet for UniformSampleSet {
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/optimizer/property/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod selectivity;
pub use builder::RelExpr;
pub use column_stat::ColumnStat;
pub use column_stat::ColumnStatSet;
pub use column_stat::NewStatistic;
pub use datum::Datum;
pub use enforcer::require_property;
pub use histogram::histogram_from_ndv;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl<'a> SelectivityEstimator<'a> {
let mut num_greater = 0.0;
for bucket in col_hist.buckets_iter() {
if let Ok(ord) = bucket.upper_bound().compare(&const_datum) {
if ord == Ordering::Less {
if ord == Ordering::Less || ord == Ordering::Equal {
num_greater += bucket.num_values();
} else {
break;
Expand Down
19 changes: 16 additions & 3 deletions src/query/sql/src/planner/plans/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use common_catalog::table_context::TableContext;
Expand Down Expand Up @@ -67,7 +68,7 @@ impl Operator for Filter {
}

fn derive_relational_prop(&self, rel_expr: &RelExpr) -> Result<RelationalProperty> {
let input_prop = rel_expr.derive_relational_prop_child(0)?;
let mut input_prop = rel_expr.derive_relational_prop_child(0)?;
let output_columns = input_prop.output_columns;

// Derive outer columns
Expand All @@ -90,11 +91,23 @@ impl Operator for Filter {
selectivity *= sb.compute_selectivity(pred);
}
let cardinality = input_prop.cardinality * selectivity;

// Derive used columns
let mut used_columns = self.used_columns()?;
used_columns.extend(input_prop.used_columns);

// Derive column statistics
let column_stats = if cardinality == 0.0 {
HashMap::new()
} else {
for (_, column_stat) in input_prop.statistics.column_stats.iter_mut() {
if cardinality < input_prop.cardinality {
column_stat.ndv =
(column_stat.ndv * cardinality / input_prop.cardinality).ceil();
}
}
input_prop.statistics.column_stats
};

Ok(RelationalProperty {
output_columns,
outer_columns,
Expand All @@ -104,7 +117,7 @@ impl Operator for Filter {
// precise cardinality
statistics: Statistics {
precise_cardinality: None,
column_stats: Default::default(),
column_stats,
is_accurate: false,
},
})
Expand Down
165 changes: 127 additions & 38 deletions src/query/sql/src/planner/plans/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::Display;
use std::fmt::Formatter;
use std::sync::Arc;
Expand All @@ -26,6 +27,7 @@ use crate::optimizer::Datum;
use crate::optimizer::Distribution;
use crate::optimizer::Histogram;
use crate::optimizer::InterleavedBucket;
use crate::optimizer::NewStatistic;
use crate::optimizer::PhysicalProperty;
use crate::optimizer::RelExpr;
use crate::optimizer::RelationalProperty;
Expand Down Expand Up @@ -168,8 +170,8 @@ impl Join {

fn inner_join_cardinality(
&self,
left_prop: &RelationalProperty,
right_prop: &RelationalProperty,
left_prop: &mut RelationalProperty,
right_prop: &mut RelationalProperty,
) -> Result<f64> {
let mut join_card = left_prop.cardinality * right_prop.cardinality;
for (left_condition, right_condition) in self
Expand All @@ -195,12 +197,7 @@ impl Join {
.get(right_condition.used_columns().iter().next().unwrap());
match (left_col_stat, right_col_stat) {
(Some(left_col_stat), Some(right_col_stat)) => {
if let Datum::Bytes(_) | Datum::Bool(_) = left_col_stat.min {
let card =
evaluate_by_ndv(left_col_stat, right_col_stat, left_prop, right_prop);
if card < join_card {
join_card = card;
}
if !left_col_stat.min.type_comparable(&right_col_stat.min) {
continue;
}
let left_interval =
Expand All @@ -209,25 +206,66 @@ impl Join {
right_col_stat.min.clone(),
right_col_stat.max.clone(),
);
if left_interval.has_intersection(&right_interval)? {
let card = match (&left_col_stat.histogram, &right_col_stat.histogram) {
(Some(left_hist), Some(right_hist)) => {
// Evaluate join cardinality by histogram.
evaluate_by_histogram(left_hist, right_hist)?
}
_ => evaluate_by_ndv(
left_col_stat,
right_col_stat,
left_prop,
right_prop,
),
};
if !left_interval.has_intersection(&right_interval)? {
join_card = 0.0;
continue;
}

// Update column min and max value
let mut new_ndv = None;
let (new_min, new_max) = left_interval.intersection(&right_interval)?;

if let Datum::Bytes(_) | Datum::Bool(_) = left_col_stat.min {
let card = evaluate_by_ndv(
left_col_stat,
right_col_stat,
left_prop,
right_prop,
&mut new_ndv,
);
if card < join_card {
join_card = card;
}
} else {
join_card = 0.0;
update_statistic(
left_prop,
right_prop,
left_condition,
right_condition,
NewStatistic {
min: new_min,
max: new_max,
ndv: new_ndv,
},
);
continue;
}
let card = match (&left_col_stat.histogram, &right_col_stat.histogram) {
(Some(left_hist), Some(right_hist)) => {
// Evaluate join cardinality by histogram.
evaluate_by_histogram(left_hist, right_hist, &mut new_ndv)?
}
_ => evaluate_by_ndv(
left_col_stat,
right_col_stat,
left_prop,
right_prop,
&mut new_ndv,
),
};
if card < join_card {
join_card = card;
}
update_statistic(
left_prop,
right_prop,
left_condition,
right_condition,
NewStatistic {
min: new_min,
max: new_max,
ndv: new_ndv,
},
);
}
_ => continue,
}
Expand All @@ -243,9 +281,8 @@ impl Operator for Join {
}

fn derive_relational_prop(&self, rel_expr: &RelExpr) -> Result<RelationalProperty> {
let left_prop = rel_expr.derive_relational_prop_child(0)?;
let right_prop = rel_expr.derive_relational_prop_child(1)?;

let mut left_prop = rel_expr.derive_relational_prop_child(0)?;
let mut right_prop = rel_expr.derive_relational_prop_child(1)?;
// Derive output columns
let mut output_columns = left_prop.output_columns.clone();
if let Some(mark_index) = self.marker_index {
Expand Down Expand Up @@ -273,21 +310,22 @@ impl Operator for Join {
}
outer_columns = outer_columns.difference(&output_columns).cloned().collect();

// Evaluating join cardinality using histograms.
// If histogram is None, will evaluate using NDV.
let inner_join_cardinality =
self.inner_join_cardinality(&mut left_prop, &mut right_prop)?;
let cardinality = match self.join_type {
JoinType::Inner | JoinType::Cross => {
// Evaluating join cardinality using histograms.
// If histogram is None, will evaluate using NDV.
self.inner_join_cardinality(&left_prop, &right_prop)?
}

JoinType::Left | JoinType::Right | JoinType::Full => {
f64::max(left_prop.cardinality, right_prop.cardinality)
JoinType::Inner | JoinType::Cross => inner_join_cardinality,
JoinType::Left => f64::max(left_prop.cardinality, inner_join_cardinality),
JoinType::Right => f64::max(right_prop.cardinality, inner_join_cardinality),
JoinType::Full => {
f64::max(left_prop.cardinality, inner_join_cardinality)
+ f64::max(right_prop.cardinality, inner_join_cardinality)
- inner_join_cardinality
}

JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark | JoinType::Single => {
left_prop.cardinality
}

JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
right_prop.cardinality
}
Expand All @@ -297,6 +335,15 @@ impl Operator for Join {
let mut used_columns = self.used_columns()?;
used_columns.extend(left_prop.used_columns);
used_columns.extend(right_prop.used_columns);
// Derive column statistics
let column_stats = if cardinality == 0.0 {
HashMap::new()
} else {
let mut column_stats = HashMap::new();
column_stats.extend(left_prop.statistics.column_stats);
column_stats.extend(right_prop.statistics.column_stats);
column_stats
};

Ok(RelationalProperty {
output_columns,
Expand All @@ -305,7 +352,7 @@ impl Operator for Join {
cardinality,
statistics: Statistics {
precise_cardinality: None,
column_stats: Default::default(),
column_stats,
is_accurate: false,
},
})
Expand Down Expand Up @@ -359,8 +406,13 @@ impl Operator for Join {
}
}

fn evaluate_by_histogram(left_hist: &Histogram, right_hist: &Histogram) -> Result<f64> {
fn evaluate_by_histogram(
left_hist: &Histogram,
right_hist: &Histogram,
new_ndv: &mut Option<f64>,
) -> Result<f64> {
let mut interleaved_buckets = vec![];
let mut all_ndv = 0.0;
for (left_idx, left_bucket) in left_hist.buckets.iter().enumerate() {
if left_idx == 0 {
continue;
Expand Down Expand Up @@ -448,12 +500,14 @@ fn evaluate_by_histogram(left_hist: &Histogram, right_hist: &Histogram) -> Resul
}
let mut card = 0.0;
for bucket in interleaved_buckets {
all_ndv += bucket.left_ndv.min(bucket.right_ndv);
let max_ndv = f64::max(bucket.left_ndv, bucket.right_ndv);
if max_ndv == 0.0 {
continue;
}
card += bucket.left_num_rows * bucket.right_num_rows / max_ndv;
}
*new_ndv = Some(all_ndv);
Ok(card)
}

Expand All @@ -462,11 +516,46 @@ fn evaluate_by_ndv(
right_stat: &ColumnStat,
left_prop: &RelationalProperty,
right_prop: &RelationalProperty,
new_ndv: &mut Option<f64>,
) -> f64 {
// Update column ndv
*new_ndv = Some(left_stat.ndv.min(right_stat.ndv));

let max_ndv = f64::max(left_stat.ndv, right_stat.ndv);
if max_ndv == 0.0 {
0.0
} else {
left_prop.cardinality * right_prop.cardinality / max_ndv
}
}

fn update_statistic(
left_prop: &mut RelationalProperty,
right_prop: &mut RelationalProperty,
left_condition: &Scalar,
right_condition: &Scalar,
new_stat: NewStatistic,
) {
let left_col_stat = left_prop
.statistics
.column_stats
.get_mut(left_condition.used_columns().iter().next().unwrap())
.unwrap();
let right_col_stat = right_prop
.statistics
.column_stats
.get_mut(right_condition.used_columns().iter().next().unwrap())
.unwrap();
if let Some(new_min) = new_stat.min {
left_col_stat.min = new_min.clone();
right_col_stat.min = new_min;
}
if let Some(new_max) = new_stat.max {
left_col_stat.max = new_max.clone();
right_col_stat.max = new_max;
}
if let Some(new_ndv) = new_stat.ndv {
left_col_stat.ndv = new_ndv;
right_col_stat.ndv = new_ndv;
}
}
Loading

1 comment on commit e80d713

@vercel
Copy link

@vercel vercel bot commented on e80d713 Jan 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.vercel.app
databend-databend.vercel.app
databend.rs
databend-git-main-databend.vercel.app

Please sign in to comment.