diff --git a/Cargo.lock b/Cargo.lock index 421f342d8c..213b8cb618 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4621,6 +4621,7 @@ dependencies = [ name = "partition_table_engine" version = "1.2.6-alpha" dependencies = [ + "analytic_engine", "async-trait", "common_types", "futures 0.3.28", diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index 72a2519b7b..0c7da7ab37 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -401,6 +401,17 @@ impl TableImpl { } } +pub fn support_pushdown(schema: &Schema, need_dedup: bool, col_names: &[String]) -> bool { + if !need_dedup { + return true; + } + + // When table need dedup, only unique keys columns support pushdown + col_names + .iter() + .all(|col_name| schema.is_unique_column(col_name.as_str())) +} + #[async_trait] impl Table for TableImpl { fn name(&self) -> &str { @@ -431,6 +442,12 @@ impl Table for TableImpl { self.table_data.metrics.table_stats() } + fn support_pushdown(&self, read_schema: &Schema, col_names: &[String]) -> bool { + let need_dedup = self.table_data.table_options().need_dedup(); + + support_pushdown(read_schema, need_dedup, col_names) + } + async fn write(&self, request: WriteRequest) -> Result { let _timer = self.table_data.metrics.start_table_total_timer(); diff --git a/analytic_engine/src/table_options.rs b/analytic_engine/src/table_options.rs index 67bc2eb1af..c1ca841507 100644 --- a/analytic_engine/src/table_options.rs +++ b/analytic_engine/src/table_options.rs @@ -412,6 +412,11 @@ pub struct TableOptions { } impl TableOptions { + pub fn from_map(map: &HashMap, is_create: bool) -> Result { + let opt = Self::default(); + merge_table_options(map, &opt, is_create) + } + #[inline] pub fn segment_duration(&self) -> Option { self.segment_duration.map(|v| v.0) @@ -712,7 +717,11 @@ fn merge_table_options( let mut table_opts = table_old_opts.clone(); if is_create { if let Some(v) = options.get(SEGMENT_DURATION) { - table_opts.segment_duration = Some(parse_duration(v).context(ParseDuration)?); + if v.is_empty() { + table_opts.segment_duration = None; + } else { + table_opts.segment_duration = Some(parse_duration(v).context(ParseDuration)?); + } } if let Some(v) = options.get(UPDATE_MODE) { table_opts.update_mode = UpdateMode::parse_from(v)?; diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index d744da0bc6..9e219483c3 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -884,34 +884,26 @@ impl Schema { } } - pub fn unique_keys(&self) -> Vec<&str> { - // Only filters on the columns belonging to the unique key can be pushed down. - if self.tsid_column().is_some() { - // When tsid exists, that means default primary key (tsid, timestamp) is used. - // So, all filters of tag columns(tsid is the hash result of all tags), - // timestamp key and tsid can be pushed down. - let mut keys = self - .columns() - .iter() - .filter_map(|column| { - if column.is_tag { - Some(column.name.as_str()) - } else { - None - } - }) - .collect::>(); - keys.extend([&self.tsid_column().unwrap().name, self.timestamp_name()]); + pub fn is_unique_column(&self, col_name: &str) -> bool { + // primary key is obvious unique. + let is_primary_key = self + .primary_key_indexes() + .iter() + .map(|key_idx| self.column(*key_idx).name.as_str()) + .any(|primary_key| primary_key == col_name); - keys - } else { - // When tsid does not exist, that means user defined primary key is used. - // So, only filters of primary key can be pushed down. - self.primary_key_indexes() - .iter() - .map(|key_idx| self.column(*key_idx).name.as_str()) - .collect() + if is_primary_key { + return true; } + + if self.tsid_column().is_none() { + return false; + } + + // When tsid exists, it means tag column is also unique. + self.columns() + .iter() + .any(|column| column.is_tag && column.name == col_name) } /// Panic if projection is invalid. diff --git a/partition_table_engine/Cargo.toml b/partition_table_engine/Cargo.toml index 845d79ea09..194da37611 100644 --- a/partition_table_engine/Cargo.toml +++ b/partition_table_engine/Cargo.toml @@ -25,6 +25,7 @@ workspace = true workspace = true [dependencies] +analytic_engine = { workspace = true } async-trait = { workspace = true } common_types = { workspace = true } futures = { workspace = true } diff --git a/partition_table_engine/src/lib.rs b/partition_table_engine/src/lib.rs index d1ac6af69f..d511d040c6 100644 --- a/partition_table_engine/src/lib.rs +++ b/partition_table_engine/src/lib.rs @@ -20,6 +20,7 @@ mod partition; use std::sync::Arc; +use analytic_engine::TableOptions; use async_trait::async_trait; use generic_error::BoxError; use snafu::{OptionExt, ResultExt}; @@ -67,7 +68,9 @@ impl TableEngine for PartitionTableEngine { partition_info: request.partition_info.context(UnexpectedNoCause { msg: "partition info not found", })?, - options: request.options, + options: TableOptions::from_map(&request.options, true) + .box_err() + .context(Unexpected)?, engine_type: request.engine, }; Ok(Arc::new( diff --git a/partition_table_engine/src/partition.rs b/partition_table_engine/src/partition.rs index 1519f5b269..decd0e4ae7 100644 --- a/partition_table_engine/src/partition.rs +++ b/partition_table_engine/src/partition.rs @@ -16,6 +16,7 @@ use std::{collections::HashMap, fmt}; +use analytic_engine::{table::support_pushdown, TableOptions}; use async_trait::async_trait; use common_types::{ row::{Row, RowGroupBuilder}, @@ -55,7 +56,7 @@ pub struct TableData { pub table_id: TableId, pub table_schema: Schema, pub partition_info: PartitionInfo, - pub options: HashMap, + pub options: TableOptions, pub engine_type: String, } @@ -109,7 +110,7 @@ impl Table for PartitionTableImpl { // TODO: get options from sub partition table with remote engine fn options(&self) -> HashMap { - self.table_data.options.clone() + self.table_data.options.to_raw_map() } fn partition_info(&self) -> Option { @@ -124,6 +125,13 @@ impl Table for PartitionTableImpl { TableStats::default() } + // TODO: maybe we should ask remote sub table whether support pushdown + fn support_pushdown(&self, read_schema: &Schema, col_names: &[String]) -> bool { + let need_dedup = self.table_data.options.need_dedup(); + + support_pushdown(read_schema, need_dedup, col_names) + } + async fn write(&self, request: WriteRequest) -> Result { let _timer = PARTITION_TABLE_WRITE_DURATION_HISTOGRAM .with_label_values(&["total"]) diff --git a/system_catalog/src/lib.rs b/system_catalog/src/lib.rs index 395d529ea5..4160ccd7b8 100644 --- a/system_catalog/src/lib.rs +++ b/system_catalog/src/lib.rs @@ -125,6 +125,10 @@ impl Table for SystemTableAdapter { TableStats::default() } + fn support_pushdown(&self, _read_schema: &Schema, _col_names: &[String]) -> bool { + false + } + async fn write(&self, _request: WriteRequest) -> table_engine::table::Result { Ok(0) } diff --git a/table_engine/src/memory.rs b/table_engine/src/memory.rs index 1ad67aa4c4..3d81c31726 100644 --- a/table_engine/src/memory.rs +++ b/table_engine/src/memory.rs @@ -118,6 +118,10 @@ impl Table for MemoryTable { TableStats::default() } + fn support_pushdown(&self, _read_schema: &Schema, _col_names: &[String]) -> bool { + false + } + async fn write(&self, request: WriteRequest) -> Result { // TODO(yingwen) Maybe check schema? let mut row_groups = self.row_groups.write().unwrap(); diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs index e0f3a586dd..ffcca3467a 100644 --- a/table_engine/src/provider.rs +++ b/table_engine/src/provider.rs @@ -23,9 +23,7 @@ use std::{ use arrow::datatypes::SchemaRef; use async_trait::async_trait; -use common_types::{ - projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema, UPDATE_MODE, -}; +use common_types::{projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema}; use datafusion::{ config::{ConfigEntry, ConfigExtension, ExtensionOptions}, datasource::TableProvider, @@ -186,12 +184,13 @@ impl TableProviderAdapter { } fn check_and_build_predicate_from_filters(&self, filters: &[Expr]) -> PredicateRef { - let pushdown_states = self.pushdown_inner(&filters.iter().collect::>()); let pushdown_filters = filters .iter() - .zip(pushdown_states.iter()) - .filter_map(|(filter, state)| { - if matches!(state, &TableProviderFilterPushDown::Exact) { + .filter_map(|filter| { + let filter_cols = visitor::find_columns_by_expr(filter); + + let support_pushdown = self.table.support_pushdown(&self.read_schema, &filter_cols); + if support_pushdown { Some(filter.clone()) } else { None @@ -205,33 +204,14 @@ impl TableProviderAdapter { .build() } - fn only_filter_unique_key_columns(filter: &Expr, unique_keys: &[&str]) -> bool { - let filter_cols = visitor::find_columns_by_expr(filter); - for filter_col in filter_cols { - // If a column which is not part of the unique key occurred in `filter`, the - // `filter` shouldn't be pushed down. - if !unique_keys.contains(&filter_col.as_str()) { - return false; - } - } - true - } - fn pushdown_inner(&self, filters: &[&Expr]) -> Vec { - let unique_keys = self.read_schema.unique_keys(); - // TODO: add pushdown check in table trait - let options = &self.table.options(); - let is_append = matches!(options.get(UPDATE_MODE), Some(mode) if mode == "APPEND"); - let is_system_engine = self.table.engine_type() == "system"; - filters .iter() .map(|filter| { - if is_system_engine { - return TableProviderFilterPushDown::Inexact; - } + let filter_cols = visitor::find_columns_by_expr(filter); - if is_append || Self::only_filter_unique_key_columns(filter, &unique_keys) { + let support_pushdown = self.table.support_pushdown(&self.read_schema, &filter_cols); + if support_pushdown { TableProviderFilterPushDown::Exact } else { TableProviderFilterPushDown::Inexact @@ -396,17 +376,27 @@ impl ExecutionPlan for ScanTable { } fn metrics(&self) -> Option { + let mut metric_set = MetricsSet::new(); + let mut format_visitor = FormatCollectorVisitor::default(); self.metrics_collector.visit(&mut format_visitor); let metrics_desc = format_visitor.into_string(); - - let metric_value = MetricValue::Count { - name: format!("\n{metrics_desc}").into(), - count: Count::new(), - }; - let metric = Metric::new(metric_value, None); - let mut metric_set = MetricsSet::new(); - metric_set.push(Arc::new(metric)); + metric_set.push(Arc::new(Metric::new( + MetricValue::Count { + name: format!("\n{metrics_desc}").into(), + count: Count::new(), + }, + None, + ))); + + let pushdown_filters = &self.predicate; + metric_set.push(Arc::new(Metric::new( + MetricValue::Count { + name: format!("\n{pushdown_filters:?}").into(), + count: Count::new(), + }, + None, + ))); Some(metric_set) } @@ -438,137 +428,3 @@ impl fmt::Debug for ScanTable { .finish() } } - -#[cfg(test)] -mod test { - use std::sync::Arc; - - use common_types::{ - column_schema, - datum::DatumKind, - schema::{Builder, Schema}, - }; - use datafusion::{ - logical_expr::{col, Expr}, - scalar::ScalarValue, - }; - - use super::*; - use crate::{memory::MemoryTable, table::TableId}; - - fn build_user_defined_primary_key_schema() -> Schema { - Builder::new() - .auto_increment_column_id(true) - .add_key_column( - column_schema::Builder::new("user_define1".to_string(), DatumKind::String) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .add_key_column( - column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .add_normal_column( - column_schema::Builder::new("field1".to_string(), DatumKind::String) - .is_tag(true) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .add_normal_column( - column_schema::Builder::new("field2".to_string(), DatumKind::Double) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .build() - .unwrap() - } - - fn build_default_primary_key_schema() -> Schema { - Builder::new() - .auto_increment_column_id(true) - .add_key_column( - column_schema::Builder::new("tsid".to_string(), DatumKind::UInt64) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .add_key_column( - column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .add_normal_column( - column_schema::Builder::new("user_define1".to_string(), DatumKind::String) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .add_normal_column( - column_schema::Builder::new("field1".to_string(), DatumKind::String) - .is_tag(true) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .add_normal_column( - column_schema::Builder::new("field2".to_string(), DatumKind::Double) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .build() - .unwrap() - } - - fn build_filters() -> Vec { - let filter1 = col("timestamp").lt(Expr::Literal(ScalarValue::UInt64(Some(10086)))); - let filter2 = - col("user_define1").eq(Expr::Literal(ScalarValue::Utf8(Some("10086".to_string())))); - let filter3 = col("field1").eq(Expr::Literal(ScalarValue::Utf8(Some("10087".to_string())))); - let filter4 = col("field2").eq(Expr::Literal(ScalarValue::Float64(Some(10088.0)))); - - vec![filter1, filter2, filter3, filter4] - } - - #[test] - pub fn test_push_down_in_user_defined_primary_key_case() { - let test_filters = build_filters(); - let user_defined_pk_schema = build_user_defined_primary_key_schema(); - - let table = MemoryTable::new( - "test_table".to_string(), - TableId::new(0), - user_defined_pk_schema, - "memory".to_string(), - ); - let provider = TableProviderAdapter::new(Arc::new(table)); - let predicate = provider.check_and_build_predicate_from_filters(&test_filters); - - let expected_filters = vec![test_filters[0].clone(), test_filters[1].clone()]; - assert_eq!(predicate.exprs(), &expected_filters); - } - - #[test] - pub fn test_push_down_in_default_primary_key_case() { - let test_filters = build_filters(); - let default_pk_schema = build_default_primary_key_schema(); - - let table = MemoryTable::new( - "test_table".to_string(), - TableId::new(0), - default_pk_schema, - "memory".to_string(), - ); - let provider = TableProviderAdapter::new(Arc::new(table)); - let predicate = provider.check_and_build_predicate_from_filters(&test_filters); - - let expected_filters = vec![test_filters[0].clone(), test_filters[2].clone()]; - assert_eq!(predicate.exprs(), &expected_filters); - } -} diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index ac34e4bc41..40a49f9612 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -487,6 +487,12 @@ pub trait Table: std::fmt::Debug { /// Get table's statistics. fn stats(&self) -> TableStats; + /// Whether the columns used in filter expr can be pushdown. + /// + /// `read_schema` is used here to avoid upper layer see different schema + /// during one query. + fn support_pushdown(&self, _read_schema: &Schema, _col_names: &[String]) -> bool; + /// Write to table. async fn write(&self, request: WriteRequest) -> Result;