diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index 945d54b658..7181953015 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -808,6 +808,36 @@ impl Schema { } } + pub fn unique_keys(&self) -> Vec<&str> { + // Only filters on the columns belonging to the unique key can be pushed down. + if self.tsid_column().is_some() { + // When tsid exists, that means default primary key (tsid, timestamp) is used. + // So, all filters of tag columns(tsid is the hash result of all tags), + // timestamp key and tsid can be pushed down. + let mut keys = self + .columns() + .iter() + .filter_map(|column| { + if column.is_tag { + Some(column.name.as_str()) + } else { + None + } + }) + .collect::>(); + keys.extend([&self.tsid_column().unwrap().name, self.timestamp_name()]); + + keys + } else { + // When tsid does not exist, that means user defined primary key is used. + // So, only filters of primary key can be pushed down. + self.primary_key_indexes() + .iter() + .map(|key_idx| self.column(*key_idx).name.as_str()) + .collect() + } + } + /// Panic if projection is invalid. pub(crate) fn project_record_schema_with_key( &self, diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs index c73f360caa..7945dc314c 100644 --- a/table_engine/src/provider.rs +++ b/table_engine/src/provider.rs @@ -26,6 +26,7 @@ use datafusion::{ scalar::ScalarValue, }; use datafusion_expr::{TableSource, TableType}; +use df_operator::visitor; use log::debug; use crate::{ @@ -100,7 +101,7 @@ impl TableProviderAdapter { self.read_parallelism }; - let predicate = self.predicate_from_filters(filters); + let predicate = self.check_and_build_predicate_from_filters(filters); let mut scan_table = ScanTable { projected_schema: ProjectedSchema::new(self.read_schema.clone(), projection.clone()) .map_err(|e| { @@ -122,12 +123,37 @@ impl TableProviderAdapter { Ok(Arc::new(scan_table)) } - fn predicate_from_filters(&self, filters: &[Expr]) -> PredicateRef { + fn check_and_build_predicate_from_filters(&self, filters: &[Expr]) -> PredicateRef { + let unique_keys = self.read_schema.unique_keys(); + + let push_down_filters = filters + .iter() + .filter_map(|filter| { + if Self::only_filter_unique_key_columns(filter, &unique_keys) { + Some(filter.clone()) + } else { + None + } + }) + .collect::>(); + PredicateBuilder::default() - .add_pushdown_exprs(filters) - .extract_time_range(&self.read_schema, filters) + .add_pushdown_exprs(&push_down_filters) + .extract_time_range(&self.read_schema, &push_down_filters) .build() } + + fn only_filter_unique_key_columns(filter: &Expr, unique_keys: &[&str]) -> bool { + let filter_cols = visitor::find_columns_by_expr(filter); + for filter_col in filter_cols { + // If a column which is not part of the unique key occurred in `filter`, the + // `filter` shouldn't be pushed down. + if !unique_keys.contains(&filter_col.as_str()) { + return false; + } + } + true + } } #[async_trait] @@ -329,3 +355,135 @@ impl fmt::Debug for ScanTable { .finish() } } + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use common_types::{ + column_schema, + datum::DatumKind, + schema::{Builder, Schema}, + }; + use datafusion::scalar::ScalarValue; + use datafusion_expr::{col, Expr}; + + use super::TableProviderAdapter; + use crate::{memory::MemoryTable, table::TableId}; + + fn build_user_defined_primary_key_schema() -> Schema { + Builder::new() + .auto_increment_column_id(true) + .add_key_column( + column_schema::Builder::new("user_define1".to_string(), DatumKind::String) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_key_column( + column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("field1".to_string(), DatumKind::String) + .is_tag(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("field2".to_string(), DatumKind::Double) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .build() + .unwrap() + } + + fn build_default_primary_key_schema() -> Schema { + Builder::new() + .auto_increment_column_id(true) + .add_key_column( + column_schema::Builder::new("tsid".to_string(), DatumKind::UInt64) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_key_column( + column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("user_define1".to_string(), DatumKind::String) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("field1".to_string(), DatumKind::String) + .is_tag(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("field2".to_string(), DatumKind::Double) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .build() + .unwrap() + } + + fn build_filters() -> Vec { + let filter1 = col("timestamp").lt(Expr::Literal(ScalarValue::UInt64(Some(10086)))); + let filter2 = + col("user_define1").eq(Expr::Literal(ScalarValue::Utf8(Some("10086".to_string())))); + let filter3 = col("field1").eq(Expr::Literal(ScalarValue::Utf8(Some("10087".to_string())))); + let filter4 = col("field2").eq(Expr::Literal(ScalarValue::Float64(Some(10088.0)))); + + vec![filter1, filter2, filter3, filter4] + } + + #[test] + pub fn test_push_down_in_user_defined_primary_key_case() { + let test_filters = build_filters(); + let user_defined_pk_schema = build_user_defined_primary_key_schema(); + + let table = MemoryTable::new( + "test_table".to_string(), + TableId::new(0), + user_defined_pk_schema, + "memory".to_string(), + ); + let provider = TableProviderAdapter::new(Arc::new(table), 1); + let predicate = provider.check_and_build_predicate_from_filters(&test_filters); + + let expected_filters = vec![test_filters[0].clone(), test_filters[1].clone()]; + assert_eq!(predicate.exprs(), &expected_filters); + } + + #[test] + pub fn test_push_down_in_default_primary_key_case() { + let test_filters = build_filters(); + let default_pk_schema = build_default_primary_key_schema(); + + let table = MemoryTable::new( + "test_table".to_string(), + TableId::new(0), + default_pk_schema, + "memory".to_string(), + ); + let provider = TableProviderAdapter::new(Arc::new(table), 1); + let predicate = provider.check_and_build_predicate_from_filters(&test_filters); + + let expected_filters = vec![test_filters[0].clone(), test_filters[2].clone()]; + assert_eq!(predicate.exprs(), &expected_filters); + } +}