Skip to content

Commit

Permalink
feat: do not remove time filters in ScanRegion (#5180)
Browse files Browse the repository at this point in the history
* feat: do not remove time filters

* chore: remove `time_range` from parquet reader

* chore: print more message in the check script

* chore: fix unused error
  • Loading branch information
evenyag committed Dec 20, 2024
1 parent 614a25d commit 041a276
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 118 deletions.
6 changes: 4 additions & 2 deletions scripts/check-snafu.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ def main():
if not check_snafu_in_files(branch_name, other_rust_files)
]

for name in unused_snafu:
print(name)
if unused_snafu:
print("Unused error variants:")
for name in unused_snafu:
print(name)

if unused_snafu:
raise SystemExit(1)
Expand Down
8 changes: 0 additions & 8 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,13 +756,6 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to build time range filters for value: {:?}", timestamp))]
BuildTimeRangeFilter {
timestamp: Timestamp,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to open region"))]
OpenRegion {
#[snafu(implicit)]
Expand Down Expand Up @@ -1023,7 +1016,6 @@ impl ErrorExt for Error {
ChecksumMismatch { .. } => StatusCode::Unexpected,
RegionStopped { .. } => StatusCode::RegionNotReady,
TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments,
BuildTimeRangeFilter { .. } => StatusCode::Unexpected,
UnsupportedOperation { .. } => StatusCode::Unsupported,
RemoteCompaction { .. } => StatusCode::Unexpected,

Expand Down
11 changes: 3 additions & 8 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,20 +355,16 @@ impl ScanRegion {
Ok(input)
}

/// Build time range predicate from filters, also remove time filters from request.
fn build_time_range_predicate(&mut self) -> TimestampRange {
/// Build time range predicate from filters.
fn build_time_range_predicate(&self) -> TimestampRange {
let time_index = self.version.metadata.time_index_column();
let unit = time_index
.column_schema
.data_type
.as_timestamp()
.expect("Time index must have timestamp-compatible type")
.unit();
build_time_range_predicate(
&time_index.column_schema.name,
unit,
&mut self.request.filters,
)
build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
}

/// Remove field filters if the merge mode is [MergeMode::LastNonNull].
Expand Down Expand Up @@ -695,7 +691,6 @@ impl ScanInput {
.access_layer
.read_sst(file.clone())
.predicate(self.predicate.clone())
.time_range(self.time_range)
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_manager.clone())
.inverted_index_applier(self.inverted_index_applier.clone())
Expand Down
90 changes: 2 additions & 88 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ use api::v1::SemanticType;
use async_trait::async_trait;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::{debug, warn};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datafusion_expr::{Expr, Operator};
use datafusion_expr::Expr;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
Expand All @@ -42,7 +38,6 @@ use store_api::storage::ColumnId;
use table::predicate::Predicate;

use crate::cache::CacheManagerRef;
use crate::error;
use crate::error::{
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadParquetSnafu, Result,
};
Expand Down Expand Up @@ -74,8 +69,6 @@ pub struct ParquetReaderBuilder {
object_store: ObjectStore,
/// Predicate to push down.
predicate: Option<Predicate>,
/// Time range to filter.
time_range: Option<TimestampRange>,
/// Metadata of columns to read.
///
/// `None` reads all columns. Due to schema change, the projection
Expand Down Expand Up @@ -104,7 +97,6 @@ impl ParquetReaderBuilder {
file_handle,
object_store,
predicate: None,
time_range: None,
projection: None,
cache_manager: None,
inverted_index_applier: None,
Expand All @@ -120,13 +112,6 @@ impl ParquetReaderBuilder {
self
}

/// Attaches the time range to the builder.
#[must_use]
pub fn time_range(mut self, time_range: Option<TimestampRange>) -> ParquetReaderBuilder {
self.time_range = time_range;
self
}

/// Attaches the projection to the builder.
///
/// The reader only applies the projection to fields.
Expand Down Expand Up @@ -238,7 +223,7 @@ impl ParquetReaderBuilder {
cache_manager: self.cache_manager.clone(),
};

let mut filters = if let Some(predicate) = &self.predicate {
let filters = if let Some(predicate) = &self.predicate {
predicate
.exprs()
.iter()
Expand All @@ -254,10 +239,6 @@ impl ParquetReaderBuilder {
vec![]
};

if let Some(time_range) = &self.time_range {
filters.extend(time_range_to_predicate(*time_range, &region_meta)?);
}

let codec = McmpRowCodec::new(
read_format
.metadata()
Expand Down Expand Up @@ -678,59 +659,6 @@ impl ParquetReaderBuilder {
}
}

/// Transforms time range into [SimpleFilterEvaluator].
fn time_range_to_predicate(
time_range: TimestampRange,
metadata: &RegionMetadataRef,
) -> Result<Vec<SimpleFilterContext>> {
let ts_col = metadata.time_index_column();
let ts_col_id = ts_col.column_id;

let ts_to_filter = |op: Operator, timestamp: &Timestamp| {
let value = match timestamp.unit() {
TimeUnit::Second => ScalarValue::TimestampSecond(Some(timestamp.value()), None),
TimeUnit::Millisecond => {
ScalarValue::TimestampMillisecond(Some(timestamp.value()), None)
}
TimeUnit::Microsecond => {
ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None)
}
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None),
};
let evaluator = SimpleFilterEvaluator::new(ts_col.column_schema.name.clone(), value, op)
.context(error::BuildTimeRangeFilterSnafu {
timestamp: *timestamp,
})?;
Ok(SimpleFilterContext::new(
evaluator,
ts_col_id,
SemanticType::Timestamp,
ts_col.column_schema.data_type.clone(),
))
};

let predicates = match (time_range.start(), time_range.end()) {
(Some(start), Some(end)) => {
vec![
ts_to_filter(Operator::GtEq, start)?,
ts_to_filter(Operator::Lt, end)?,
]
}

(Some(start), None) => {
vec![ts_to_filter(Operator::GtEq, start)?]
}

(None, Some(end)) => {
vec![ts_to_filter(Operator::Lt, end)?]
}
(None, None) => {
vec![]
}
};
Ok(predicates)
}

/// Metrics of filtering rows groups and rows.
#[derive(Debug, Default, Clone, Copy)]
pub(crate) struct ReaderFilterMetrics {
Expand Down Expand Up @@ -939,20 +867,6 @@ pub(crate) struct SimpleFilterContext {
}

impl SimpleFilterContext {
fn new(
filter: SimpleFilterEvaluator,
column_id: ColumnId,
semantic_type: SemanticType,
data_type: ConcreteDataType,
) -> Self {
Self {
filter,
column_id,
semantic_type,
data_type,
}
}

/// Creates a context for the `expr`.
///
/// Returns None if the column to filter doesn't exist in the SST metadata or the
Expand Down
4 changes: 2 additions & 2 deletions src/query/src/tests/time_range_filter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ struct TimeRangeTester {
impl TimeRangeTester {
async fn check(&self, sql: &str, expect: TimestampRange) {
let _ = exec_selection(self.engine.clone(), sql).await;
let mut filters = self.take_filters();
let filters = self.take_filters();

let range = build_time_range_predicate("ts", TimeUnit::Millisecond, &mut filters);
let range = build_time_range_predicate("ts", TimeUnit::Millisecond, &filters);
assert_eq!(expect, range);
}

Expand Down
16 changes: 6 additions & 10 deletions src/table/src/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,17 @@ impl Predicate {
// since it requires query engine to convert sql to filters.
/// `build_time_range_predicate` extracts time range from logical exprs to facilitate fast
/// time range pruning.
pub fn build_time_range_predicate<'a>(
ts_col_name: &'a str,
pub fn build_time_range_predicate(
ts_col_name: &str,
ts_col_unit: TimeUnit,
filters: &'a mut Vec<Expr>,
filters: &[Expr],
) -> TimestampRange {
let mut res = TimestampRange::min_to_max();
let mut filters_remain = vec![];
for expr in std::mem::take(filters) {
if let Some(range) = extract_time_range_from_expr(ts_col_name, ts_col_unit, &expr) {
for expr in filters {
if let Some(range) = extract_time_range_from_expr(ts_col_name, ts_col_unit, expr) {
res = res.and(&range);
} else {
filters_remain.push(expr);
}
}
*filters = filters_remain;
res
}

Expand Down Expand Up @@ -392,7 +388,7 @@ mod tests {
fn check_build_predicate(expr: Expr, expect: TimestampRange) {
assert_eq!(
expect,
build_time_range_predicate("ts", TimeUnit::Millisecond, &mut vec![expr])
build_time_range_predicate("ts", TimeUnit::Millisecond, &[expr])
);
}

Expand Down

0 comments on commit 041a276

Please sign in to comment.