Skip to content

Commit

Permalink
feat: prevent from pushing down filter of non primary key or ts key (a…
Browse files Browse the repository at this point in the history
…pache#611)

* prevent to push down filter of non primary key or ts key.

* add test.

* address CR.
  • Loading branch information
Rachelint authored and MichaelLeeHZ committed Feb 8, 2023
1 parent 9e42246 commit 6c861fa
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 4 deletions.
30 changes: 30 additions & 0 deletions common_types/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,36 @@ impl Schema {
}
}

pub fn unique_keys(&self) -> Vec<&str> {
// Only filters on the columns belonging to the unique key can be pushed down.
if self.tsid_column().is_some() {
// When tsid exists, that means default primary key (tsid, timestamp) is used.
// So, all filters of tag columns(tsid is the hash result of all tags),
// timestamp key and tsid can be pushed down.
let mut keys = self
.columns()
.iter()
.filter_map(|column| {
if column.is_tag {
Some(column.name.as_str())
} else {
None
}
})
.collect::<Vec<_>>();
keys.extend([&self.tsid_column().unwrap().name, self.timestamp_name()]);

keys
} else {
// When tsid does not exist, that means user defined primary key is used.
// So, only filters of primary key can be pushed down.
self.primary_key_indexes()
.iter()
.map(|key_idx| self.column(*key_idx).name.as_str())
.collect()
}
}

/// Panic if projection is invalid.
pub(crate) fn project_record_schema_with_key(
&self,
Expand Down
166 changes: 162 additions & 4 deletions table_engine/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use datafusion::{
scalar::ScalarValue,
};
use datafusion_expr::{TableSource, TableType};
use df_operator::visitor;
use log::debug;

use crate::{
Expand Down Expand Up @@ -100,7 +101,7 @@ impl TableProviderAdapter {
self.read_parallelism
};

let predicate = self.predicate_from_filters(filters);
let predicate = self.check_and_build_predicate_from_filters(filters);
let mut scan_table = ScanTable {
projected_schema: ProjectedSchema::new(self.read_schema.clone(), projection.clone())
.map_err(|e| {
Expand All @@ -122,12 +123,37 @@ impl TableProviderAdapter {
Ok(Arc::new(scan_table))
}

fn predicate_from_filters(&self, filters: &[Expr]) -> PredicateRef {
fn check_and_build_predicate_from_filters(&self, filters: &[Expr]) -> PredicateRef {
let unique_keys = self.read_schema.unique_keys();

let push_down_filters = filters
.iter()
.filter_map(|filter| {
if Self::only_filter_unique_key_columns(filter, &unique_keys) {
Some(filter.clone())
} else {
None
}
})
.collect::<Vec<_>>();

PredicateBuilder::default()
.add_pushdown_exprs(filters)
.extract_time_range(&self.read_schema, filters)
.add_pushdown_exprs(&push_down_filters)
.extract_time_range(&self.read_schema, &push_down_filters)
.build()
}

fn only_filter_unique_key_columns(filter: &Expr, unique_keys: &[&str]) -> bool {
let filter_cols = visitor::find_columns_by_expr(filter);
for filter_col in filter_cols {
// If a column which is not part of the unique key occurred in `filter`, the
// `filter` shouldn't be pushed down.
if !unique_keys.contains(&filter_col.as_str()) {
return false;
}
}
true
}
}

#[async_trait]
Expand Down Expand Up @@ -329,3 +355,135 @@ impl fmt::Debug for ScanTable {
.finish()
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use common_types::{
column_schema,
datum::DatumKind,
schema::{Builder, Schema},
};
use datafusion::scalar::ScalarValue;
use datafusion_expr::{col, Expr};

use super::TableProviderAdapter;
use crate::{memory::MemoryTable, table::TableId};

fn build_user_defined_primary_key_schema() -> Schema {
Builder::new()
.auto_increment_column_id(true)
.add_key_column(
column_schema::Builder::new("user_define1".to_string(), DatumKind::String)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_key_column(
column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("field1".to_string(), DatumKind::String)
.is_tag(true)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("field2".to_string(), DatumKind::Double)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.build()
.unwrap()
}

fn build_default_primary_key_schema() -> Schema {
Builder::new()
.auto_increment_column_id(true)
.add_key_column(
column_schema::Builder::new("tsid".to_string(), DatumKind::UInt64)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_key_column(
column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("user_define1".to_string(), DatumKind::String)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("field1".to_string(), DatumKind::String)
.is_tag(true)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("field2".to_string(), DatumKind::Double)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.build()
.unwrap()
}

fn build_filters() -> Vec<Expr> {
let filter1 = col("timestamp").lt(Expr::Literal(ScalarValue::UInt64(Some(10086))));
let filter2 =
col("user_define1").eq(Expr::Literal(ScalarValue::Utf8(Some("10086".to_string()))));
let filter3 = col("field1").eq(Expr::Literal(ScalarValue::Utf8(Some("10087".to_string()))));
let filter4 = col("field2").eq(Expr::Literal(ScalarValue::Float64(Some(10088.0))));

vec![filter1, filter2, filter3, filter4]
}

#[test]
pub fn test_push_down_in_user_defined_primary_key_case() {
let test_filters = build_filters();
let user_defined_pk_schema = build_user_defined_primary_key_schema();

let table = MemoryTable::new(
"test_table".to_string(),
TableId::new(0),
user_defined_pk_schema,
"memory".to_string(),
);
let provider = TableProviderAdapter::new(Arc::new(table), 1);
let predicate = provider.check_and_build_predicate_from_filters(&test_filters);

let expected_filters = vec![test_filters[0].clone(), test_filters[1].clone()];
assert_eq!(predicate.exprs(), &expected_filters);
}

#[test]
pub fn test_push_down_in_default_primary_key_case() {
let test_filters = build_filters();
let default_pk_schema = build_default_primary_key_schema();

let table = MemoryTable::new(
"test_table".to_string(),
TableId::new(0),
default_pk_schema,
"memory".to_string(),
);
let provider = TableProviderAdapter::new(Arc::new(table), 1);
let predicate = provider.check_and_build_predicate_from_filters(&test_filters);

let expected_filters = vec![test_filters[0].clone(), test_filters[2].clone()];
assert_eq!(predicate.exprs(), &expected_filters);
}
}

0 comments on commit 6c861fa

Please sign in to comment.