Skip to content

Commit

Permalink
Merge pull request #9738 from sundy-li/topn
Browse files Browse the repository at this point in the history
feat(query): add topn runtime filter in native storage format
  • Loading branch information
BohuTANG authored Jan 27, 2023
2 parents 02190e5 + 1c591d5 commit 3ea04f2
Show file tree
Hide file tree
Showing 29 changed files with 713 additions and 176 deletions.
47 changes: 47 additions & 0 deletions src/query/catalog/src/plan/pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

use std::fmt::Debug;

use common_expression::types::DataType;
use common_expression::RemoteExpr;
use common_expression::TableField;
use common_expression::TableSchema;

use crate::plan::Projection;
Expand Down Expand Up @@ -48,7 +50,52 @@ pub struct PushDownInfo {
pub order_by: Vec<(RemoteExpr<String>, bool, bool)>,
}

/// TopK is a wrapper for topk push down items.
/// We only take the first column in order_by as the topk column.
#[derive(Debug, Clone)]
pub struct TopK {
pub limit: usize,
pub order_by: TableField,
pub asc: bool,
pub column_id: u32,
}

impl PushDownInfo {
pub fn top_k(&self, schema: &TableSchema, support: fn(&DataType) -> bool) -> Option<TopK> {
if !self.order_by.is_empty() && self.limit.is_some() {
let order = &self.order_by[0];
let limit = self.limit.unwrap();

const MAX_TOPK_LIMIT: usize = 1000;
if limit > MAX_TOPK_LIMIT {
return None;
}

if let RemoteExpr::<String>::ColumnRef { id, .. } = &order.0 {
let field = schema.field_with_name(id).unwrap();
let data_type: DataType = field.data_type().into();
if !support(&data_type) {
return None;
}

let leaf_fields = schema.leaf_fields();
let column_id = leaf_fields.iter().position(|p| p == field).unwrap();

let top_k = TopK {
limit: self.limit.unwrap(),
order_by: field.clone(),
asc: order.1,
column_id: column_id as u32,
};
Some(top_k)
} else {
None
}
} else {
None
}
}

pub fn prewhere_of_push_downs(push_downs: &Option<PushDownInfo>) -> Option<PrewhereInfo> {
if let Some(PushDownInfo { prewhere, .. }) = push_downs {
prewhere.clone()
Expand Down
104 changes: 16 additions & 88 deletions src/query/expression/src/kernels/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,124 +20,39 @@ use common_arrow::arrow::buffer::Buffer;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::filter_helper::FilterHelpers;
use crate::types::array::ArrayColumnBuilder;
use crate::types::nullable::NullableColumn;
use crate::types::number::NumberColumn;
use crate::types::number::NumberScalar;
use crate::types::string::StringColumnBuilder;
use crate::types::AnyType;
use crate::types::ArgType;
use crate::types::ArrayType;
use crate::types::BooleanType;
use crate::types::StringType;
use crate::types::ValueType;
use crate::types::VariantType;
use crate::with_number_mapped_type;
use crate::with_number_type;
use crate::BlockEntry;
use crate::Column;
use crate::ColumnBuilder;
use crate::DataBlock;
use crate::Scalar;
use crate::TypeDeserializer;
use crate::Value;

impl DataBlock {
// check if the predicate has any valid row
pub fn filter_exists(predicate: &Value<AnyType>) -> Result<bool> {
let predicate = Self::cast_to_nonull_boolean(predicate).ok_or_else(|| {
ErrorCode::BadDataValueType(format!(
"Filter predict column does not support type '{:?}'",
predicate
))
})?;
match predicate {
Value::Scalar(s) => Ok(s),
Value::Column(bitmap) => Ok(bitmap.len() != bitmap.unset_bits()),
}
}

pub fn filter(self, predicate: &Value<AnyType>) -> Result<DataBlock> {
if self.num_rows() == 0 {
return Ok(self);
}

let predicate = Self::cast_to_nonull_boolean(predicate).ok_or_else(|| {
let predicate = FilterHelpers::cast_to_nonull_boolean(predicate).ok_or_else(|| {
ErrorCode::BadDataValueType(format!(
"Filter predict column does not support type '{:?}'",
predicate
))
})?;

match predicate {
Value::Scalar(s) => {
if s {
Ok(self)
} else {
Ok(self.slice(0..0))
}
}
Value::Column(bitmap) => Self::filter_with_bitmap(self, &bitmap),
}
}

// Must be numeric, boolean, or string value type
pub fn cast_to_nonull_boolean(predicate: &Value<AnyType>) -> Option<Value<BooleanType>> {
match predicate {
Value::Scalar(s) => Self::cast_scalar_to_boolean(s).map(Value::Scalar),
Value::Column(c) => Self::cast_column_to_boolean(c).map(Value::Column),
}
}

fn cast_scalar_to_boolean(s: &Scalar) -> Option<bool> {
match s {
Scalar::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num {
NumberScalar::SRC_TYPE(value) => Some(value != &SRC_TYPE::default()),
}),
Scalar::Boolean(value) => Some(*value),
Scalar::String(value) => Some(!value.is_empty()),
Scalar::Timestamp(value) => Some(*value != 0),
Scalar::Date(value) => Some(*value != 0),
Scalar::Null => Some(false),
_ => None,
}
}

fn cast_column_to_boolean(c: &Column) -> Option<Bitmap> {
match c {
Column::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num {
NumberColumn::SRC_TYPE(value) => Some(BooleanType::column_from_iter(
value.iter().map(|v| v != &SRC_TYPE::default()),
&[],
)),
}),
Column::Boolean(value) => Some(value.clone()),
Column::String(value) => Some(BooleanType::column_from_iter(
value.iter().map(|s| !s.is_empty()),
&[],
)),
Column::Timestamp(value) => Some(BooleanType::column_from_iter(
value.iter().map(|v| *v != 0),
&[],
)),
Column::Date(value) => Some(BooleanType::column_from_iter(
value.iter().map(|v| *v != 0),
&[],
)),
Column::Null { len } => Some(MutableBitmap::from_len_zeroed(*len).into()),
Column::Nullable(c) => {
let inner = Self::cast_column_to_boolean(&c.column)?;
Some((&inner) & (&c.validity))
}
_ => None,
}
}

pub fn try_as_const_bool(value: &Value<BooleanType>) -> Result<Option<bool>> {
match value {
Value::Scalar(v) => Ok(Some(*v)),
_ => Ok(None),
}
self.filter_boolean_value(predicate)
}

pub fn filter_with_bitmap(block: DataBlock, bitmap: &Bitmap) -> Result<DataBlock> {
Expand Down Expand Up @@ -169,6 +84,19 @@ impl DataBlock {
}
}
}

pub fn filter_boolean_value(self, filter: Value<BooleanType>) -> Result<DataBlock> {
match filter {
Value::Scalar(s) => {
if s {
Ok(self)
} else {
Ok(self.slice(0..0))
}
}
Value::Column(bitmap) => Self::filter_with_bitmap(self, &bitmap),
}
}
}

impl Column {
Expand Down
2 changes: 2 additions & 0 deletions src/query/expression/src/kernels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ mod scatter;
mod sort;
mod take;
mod take_chunks;
mod topk;

pub use group_by::*;
pub use group_by_hash::*;
pub use sort::*;
pub use take_chunks::*;
pub use topk::*;
Loading

1 comment on commit 3ea04f2

@vercel
Copy link

@vercel vercel bot commented on 3ea04f2 Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.vercel.app
databend-databend.vercel.app
databend.rs
databend-git-main-databend.vercel.app

Please sign in to comment.