Skip to content

Commit

Permalink
refactor: add support_pushdown in table trait (#1150)
Browse files Browse the repository at this point in the history
## Rationale
Followup PR of #1126, remove hard code condition check .

## Detailed Changes
- Add `support_pushdown` 
- Replace unique_keys with is_unique_column to avoid unnecessary Vec
allocate.
- Remove old memory table pushdown tests, all memory table shouldn't
support pushdown. Pushdown check is ensured in
`integration_tests/cases/common/dml/issue-341.sql`


## Test Plan
Existing tests

---------

Co-authored-by: WEI Xikai <ShiKaiWi@users.noreply.github.com>
  • Loading branch information
jiacai2050 and ShiKaiWi authored Aug 15, 2023
1 parent 4185b66 commit 1104866
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 201 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions analytic_engine/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,17 @@ impl TableImpl {
}
}

pub fn support_pushdown(schema: &Schema, need_dedup: bool, col_names: &[String]) -> bool {
if !need_dedup {
return true;
}

// When table need dedup, only unique keys columns support pushdown
col_names
.iter()
.all(|col_name| schema.is_unique_column(col_name.as_str()))
}

#[async_trait]
impl Table for TableImpl {
fn name(&self) -> &str {
Expand Down Expand Up @@ -431,6 +442,12 @@ impl Table for TableImpl {
self.table_data.metrics.table_stats()
}

fn support_pushdown(&self, read_schema: &Schema, col_names: &[String]) -> bool {
let need_dedup = self.table_data.table_options().need_dedup();

support_pushdown(read_schema, need_dedup, col_names)
}

async fn write(&self, request: WriteRequest) -> Result<usize> {
let _timer = self.table_data.metrics.start_table_total_timer();

Expand Down
11 changes: 10 additions & 1 deletion analytic_engine/src/table_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,11 @@ pub struct TableOptions {
}

impl TableOptions {
pub fn from_map(map: &HashMap<String, String>, is_create: bool) -> Result<Self> {
let opt = Self::default();
merge_table_options(map, &opt, is_create)
}

#[inline]
pub fn segment_duration(&self) -> Option<Duration> {
self.segment_duration.map(|v| v.0)
Expand Down Expand Up @@ -712,7 +717,11 @@ fn merge_table_options(
let mut table_opts = table_old_opts.clone();
if is_create {
if let Some(v) = options.get(SEGMENT_DURATION) {
table_opts.segment_duration = Some(parse_duration(v).context(ParseDuration)?);
if v.is_empty() {
table_opts.segment_duration = None;
} else {
table_opts.segment_duration = Some(parse_duration(v).context(ParseDuration)?);
}
}
if let Some(v) = options.get(UPDATE_MODE) {
table_opts.update_mode = UpdateMode::parse_from(v)?;
Expand Down
44 changes: 18 additions & 26 deletions common_types/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -884,34 +884,26 @@ impl Schema {
}
}

pub fn unique_keys(&self) -> Vec<&str> {
// Only filters on the columns belonging to the unique key can be pushed down.
if self.tsid_column().is_some() {
// When tsid exists, that means default primary key (tsid, timestamp) is used.
// So, all filters of tag columns(tsid is the hash result of all tags),
// timestamp key and tsid can be pushed down.
let mut keys = self
.columns()
.iter()
.filter_map(|column| {
if column.is_tag {
Some(column.name.as_str())
} else {
None
}
})
.collect::<Vec<_>>();
keys.extend([&self.tsid_column().unwrap().name, self.timestamp_name()]);
pub fn is_unique_column(&self, col_name: &str) -> bool {
// primary key is obvious unique.
let is_primary_key = self
.primary_key_indexes()
.iter()
.map(|key_idx| self.column(*key_idx).name.as_str())
.any(|primary_key| primary_key == col_name);

keys
} else {
// When tsid does not exist, that means user defined primary key is used.
// So, only filters of primary key can be pushed down.
self.primary_key_indexes()
.iter()
.map(|key_idx| self.column(*key_idx).name.as_str())
.collect()
if is_primary_key {
return true;
}

if self.tsid_column().is_none() {
return false;
}

// When tsid exists, it means tag column is also unique.
self.columns()
.iter()
.any(|column| column.is_tag && column.name == col_name)
}

/// Panic if projection is invalid.
Expand Down
1 change: 1 addition & 0 deletions partition_table_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ workspace = true
workspace = true

[dependencies]
analytic_engine = { workspace = true }
async-trait = { workspace = true }
common_types = { workspace = true }
futures = { workspace = true }
Expand Down
5 changes: 4 additions & 1 deletion partition_table_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod partition;

use std::sync::Arc;

use analytic_engine::TableOptions;
use async_trait::async_trait;
use generic_error::BoxError;
use snafu::{OptionExt, ResultExt};
Expand Down Expand Up @@ -67,7 +68,9 @@ impl TableEngine for PartitionTableEngine {
partition_info: request.partition_info.context(UnexpectedNoCause {
msg: "partition info not found",
})?,
options: request.options,
options: TableOptions::from_map(&request.options, true)
.box_err()
.context(Unexpected)?,
engine_type: request.engine,
};
Ok(Arc::new(
Expand Down
12 changes: 10 additions & 2 deletions partition_table_engine/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use std::{collections::HashMap, fmt};

use analytic_engine::{table::support_pushdown, TableOptions};
use async_trait::async_trait;
use common_types::{
row::{Row, RowGroupBuilder},
Expand Down Expand Up @@ -55,7 +56,7 @@ pub struct TableData {
pub table_id: TableId,
pub table_schema: Schema,
pub partition_info: PartitionInfo,
pub options: HashMap<String, String>,
pub options: TableOptions,
pub engine_type: String,
}

Expand Down Expand Up @@ -109,7 +110,7 @@ impl Table for PartitionTableImpl {

// TODO: get options from sub partition table with remote engine
fn options(&self) -> HashMap<String, String> {
self.table_data.options.clone()
self.table_data.options.to_raw_map()
}

fn partition_info(&self) -> Option<PartitionInfo> {
Expand All @@ -124,6 +125,13 @@ impl Table for PartitionTableImpl {
TableStats::default()
}

// TODO: maybe we should ask remote sub table whether support pushdown
fn support_pushdown(&self, read_schema: &Schema, col_names: &[String]) -> bool {
let need_dedup = self.table_data.options.need_dedup();

support_pushdown(read_schema, need_dedup, col_names)
}

async fn write(&self, request: WriteRequest) -> Result<usize> {
let _timer = PARTITION_TABLE_WRITE_DURATION_HISTOGRAM
.with_label_values(&["total"])
Expand Down
4 changes: 4 additions & 0 deletions system_catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ impl Table for SystemTableAdapter {
TableStats::default()
}

fn support_pushdown(&self, _read_schema: &Schema, _col_names: &[String]) -> bool {
false
}

async fn write(&self, _request: WriteRequest) -> table_engine::table::Result<usize> {
Ok(0)
}
Expand Down
4 changes: 4 additions & 0 deletions table_engine/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ impl Table for MemoryTable {
TableStats::default()
}

fn support_pushdown(&self, _read_schema: &Schema, _col_names: &[String]) -> bool {
false
}

async fn write(&self, request: WriteRequest) -> Result<usize> {
// TODO(yingwen) Maybe check schema?
let mut row_groups = self.row_groups.write().unwrap();
Expand Down
Loading

0 comments on commit 1104866

Please sign in to comment.