Skip to content

Commit

Permalink
add test.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Feb 6, 2023
1 parent 1b2023d commit d008bfd
Showing 1 changed file with 135 additions and 4 deletions.
139 changes: 135 additions & 4 deletions table_engine/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::{
scalar::ScalarValue,
};
use datafusion_expr::{TableSource, TableType};
use df_operator::visitor::find_columns_by_expr;
use df_operator::visitor;
use log::debug;

use crate::{
Expand Down Expand Up @@ -125,21 +125,22 @@ impl TableProviderAdapter {
}

fn check_and_build_predicate_from_filters(&self, filters: &[Expr]) -> PredicateRef {
// 1.Only filter of primary key or timestamp key can be pushed down.
// 1.Only filters of primary key or timestamp key can be pushed down.
// Build key set.
let mut key_set = HashSet::new();
// Timestamp key should be always picked.
key_set.insert(self.read_schema.timestamp_name());
if self.read_schema.tsid_column().is_some() {
// When tsid exists, that means default primary key (tsid, timestamp) is used.
// So, all tag columns(tsid is the hash result of all tags) can be pushed down.
// So, all filters of tag columns(tsid is the hash result of all tags) can be pushed down.
for column in self.read_schema.columns() {
if column.is_tag {
key_set.insert(&column.name);
}
}
} else {
// When tsid does not exist, that means user defined primary key is used.
// So, only filters of primary key can be pushed down.
for primary_idx in self.read_schema.primary_key_indexes() {
let primary_column = self.read_schema.column(*primary_idx);
key_set.insert(&primary_column.name);
Expand All @@ -165,7 +166,7 @@ impl TableProviderAdapter {
}

fn should_push_down(&self, filter: &Expr, key_set: &HashSet<&str>) -> bool {
let columns = find_columns_by_expr(filter);
let columns = visitor::find_columns_by_expr(filter);
let mut should_push_down = true;
for column in columns {
// Once found a column not primary key or timestamp key in `filter`,
Expand Down Expand Up @@ -379,3 +380,133 @@ impl fmt::Debug for ScanTable {
.finish()
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use common_types::{
column_schema,
datum::DatumKind,
schema::{Builder, Schema},
};
use datafusion::scalar::ScalarValue;
use datafusion_expr::{col, Expr};

use super::TableProviderAdapter;
use crate::{memory::MemoryTable, table::TableId};

fn build_user_defined_primary_key_schema() -> Schema {
Builder::new()
.auto_increment_column_id(true)
.add_key_column(
column_schema::Builder::new("key1".to_string(), DatumKind::String)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_key_column(
column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("field1".to_string(), DatumKind::String)
.is_tag(true)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("field2".to_string(), DatumKind::Double)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.build()
.unwrap()
}

fn build_default_primary_key_schema() -> Schema {
Builder::new()
.auto_increment_column_id(true)
.add_key_column(
column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_key_column(
column_schema::Builder::new("tsid".to_string(), DatumKind::UInt64)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("field1".to_string(), DatumKind::String)
.is_tag(true)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("field2".to_string(), DatumKind::Double)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.build()
.unwrap()
}

fn build_filters() -> Vec<Expr> {
let filter1 = col("timestamp").lt(Expr::Literal(ScalarValue::UInt64(Some(10086))));
let filter2 = col("key1").eq(Expr::Literal(ScalarValue::Utf8(Some("10086".to_string()))));
let filter3 = col("field1").eq(Expr::Literal(ScalarValue::Utf8(Some("10087".to_string()))));
let filter4 = col("field2").eq(Expr::Literal(ScalarValue::Float64(Some(10088.0))));

vec![filter1, filter2, filter3, filter4]
}

#[test]
pub fn test_push_down_in_user_defined_primary_key_case() {
let test_filters = build_filters();
let user_defined_pk_schema = build_user_defined_primary_key_schema();

let table = MemoryTable::new(
"test_table".to_string(),
TableId::new(0),
user_defined_pk_schema,
"memory".to_string(),
);
let provider = TableProviderAdapter::new(Arc::new(table), 1);
let predicate = provider.check_and_build_predicate_from_filters(&test_filters);

let expected_filters = vec![test_filters[0].clone(), test_filters[1].clone()];
assert_eq!(predicate.exprs(), &expected_filters);
}

#[test]
pub fn test_push_down_in_default_primary_key_case() {
let test_filters = build_filters();
let test_filters = vec![
test_filters[0].clone(),
test_filters[2].clone(),
test_filters[3].clone(),
];
let default_pk_schema = build_default_primary_key_schema();

let table = MemoryTable::new(
"test_table".to_string(),
TableId::new(0),
default_pk_schema,
"memory".to_string(),
);
let provider = TableProviderAdapter::new(Arc::new(table), 1);
let predicate = provider.check_and_build_predicate_from_filters(&test_filters);

let expected_filters = vec![test_filters[0].clone(), test_filters[1].clone()];
assert_eq!(predicate.exprs(), &expected_filters);
}
}

0 comments on commit d008bfd

Please sign in to comment.