Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add align to / interval support in range query #2842

Merged
merged 4 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ serde_json = "1.0"
smallvec = "1"
snafu = "0.7"
# on branch v0.38.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "0fbae07d0c46dc18e3381c406d8b9b8abef6b1fd", features = [
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "6a93567ae38d42be5c8d08b13c8ff4dde26502ef", features = [
"visitor",
] }
strum = { version = "0.25", features = ["derive"] }
Expand Down
1 change: 1 addition & 0 deletions src/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#![feature(let_chains)]
#![feature(int_roundings)]

pub mod dataframe;
pub mod datafusion;
Expand Down
69 changes: 57 additions & 12 deletions src/query/src/range_select/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ use datatypes::arrow::record_batch::RecordBatch;
use datatypes::arrow::row::{OwnedRow, RowConverter, SortField};
use futures::{ready, Stream};
use futures_util::StreamExt;
use snafu::ResultExt;
use snafu::{ensure, ResultExt};

use crate::error::{DataFusionSnafu, Result};
use crate::error::{DataFusionSnafu, RangeQuerySnafu, Result};

type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;

Expand Down Expand Up @@ -147,7 +147,7 @@ impl Fill {

#[derive(Eq, Clone, Debug)]
pub struct RangeFn {
/// with format like `max(a) 300s null`
/// with format like `max(a) RANGE 300s FILL NULL`
pub name: String,
pub data_type: DataType,
pub expr: Expr,
Expand Down Expand Up @@ -197,6 +197,7 @@ pub struct RangeSelect {
/// all range expressions
pub range_expr: Vec<RangeFn>,
pub align: Duration,
pub align_to: i64,
pub time_index: String,
pub by: Vec<Expr>,
pub schema: DFSchemaRef,
Expand All @@ -216,10 +217,28 @@ impl RangeSelect {
input: Arc<LogicalPlan>,
range_expr: Vec<RangeFn>,
align: Duration,
align_to: i64,
time_index: Expr,
by: Vec<Expr>,
projection_expr: &[Expr],
) -> Result<Self> {
ensure!(
align.as_millis() != 0,
RangeQuerySnafu {
msg: "Can't use 0 as align in Range Query"
}
);
for expr in &range_expr {
ensure!(
expr.range.as_millis() != 0,
RangeQuerySnafu {
msg: format!(
"Invalid Range expr `{}`, Can't use 0 as range in Range Query",
expr.name
)
}
);
}
let mut fields = range_expr
.iter()
.map(
Expand Down Expand Up @@ -289,6 +308,7 @@ impl RangeSelect {
input,
range_expr,
align,
align_to,
time_index: time_index_name,
schema,
by_schema,
Expand Down Expand Up @@ -322,13 +342,19 @@ impl UserDefinedLogicalNodeCore for RangeSelect {
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"RangeSelect: range_exprs=[{}], align={}s time_index={}",
"RangeSelect: range_exprs=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
self.range_expr
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", "),
self.align.as_secs(),
self.align.as_millis(),
self.align_to,
self.by
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", "),
self.time_index
)
}
Expand All @@ -338,6 +364,7 @@ impl UserDefinedLogicalNodeCore for RangeSelect {

Self {
align: self.align,
align_to: self.align_to,
range_expr: self.range_expr.clone(),
input: Arc::new(inputs[0].clone()),
time_index: self.time_index.clone(),
Expand Down Expand Up @@ -463,6 +490,7 @@ impl RangeSelect {
input: exec_input,
range_exec,
align: self.align.as_millis() as Millisecond,
align_to: self.align_to,
by: self.create_physical_expr_list(
&self.by,
input_dfschema,
Expand Down Expand Up @@ -493,6 +521,7 @@ pub struct RangeSelectExec {
input: Arc<dyn ExecutionPlan>,
range_exec: Vec<RangeFnExec>,
align: Millisecond,
align_to: i64,
time_index: String,
by: Vec<Arc<dyn PhysicalExpr>>,
schema: SchemaRef,
Expand All @@ -510,16 +539,24 @@ impl DisplayAs for RangeSelectExec {
let range_expr_strs: Vec<String> = self
.range_exec
.iter()
.map(|e| format!("RangeFnExec{{ {}, range: {:?}}}", e.expr.name(), e.range))
.map(|e| {
format!(
"{} RANGE {}s FILL {}",
e.expr.name(),
e.range / 1000,
e.fill
)
})
.collect();
let by: Vec<String> = self.by.iter().map(|e| e.to_string()).collect();
write!(
f,
"range_expr=[{}], align={}, time_index={}, by=[{}]",
"range_expr=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
range_expr_strs.join(", "),
self.align,
self.align_to,
by.join(", "),
self.time_index,
by.join(", ")
)?;
}
}
Expand Down Expand Up @@ -563,6 +600,7 @@ impl ExecutionPlan for RangeSelectExec {
time_index: self.time_index.clone(),
by: self.by.clone(),
align: self.align,
align_to: self.align_to,
schema: self.schema.clone(),
by_schema: self.by_schema.clone(),
metric: self.metric.clone(),
Expand Down Expand Up @@ -599,6 +637,7 @@ impl ExecutionPlan for RangeSelectExec {
random_state: RandomState::new(),
time_index,
align: self.align,
align_to: self.align_to,
by: self.by.clone(),
series_map: HashMap::new(),
exec_state: ExecutionState::ReadingInput,
Expand Down Expand Up @@ -629,6 +668,7 @@ struct RangeSelectStream {
time_index: usize,
/// the unit of `align` is millisecond
align: Millisecond,
align_to: i64,
by: Vec<Arc<dyn PhysicalExpr>>,
exec_state: ExecutionState,
/// Converter for the by values
Expand Down Expand Up @@ -657,11 +697,13 @@ struct SeriesState {
align_ts_accumulator: HashMap<Millisecond, Vec<Box<dyn Accumulator>>>,
}

/// According to `align`, produces a calendar-based aligned time.
/// Use `align_to` as time origin.
/// According to `align` as time interval, produces aligned time.
/// Combining the parameters related to the range query,
/// determine for each `Accumulator` `(hash, align_ts)` define,
/// which rows of data will be applied to it.
fn align_to_calendar(
fn produce_align_time(
align_to: i64,
range: Millisecond,
align: Millisecond,
ts_column: &TimestampMillisecondArray,
Expand All @@ -672,7 +714,8 @@ fn align_to_calendar(
// make modify_map for range_fn[i]
for (row, hash) in by_columns_hash.iter().enumerate() {
let ts = ts_column.value(row);
let mut align_ts = ((ts + align - 1) / align) * align;
let ith_slot = (ts - align_to).div_ceil(align);
let mut align_ts = ith_slot * align + align_to;
while align_ts - range < ts && ts <= align_ts {
modify_map
.entry((*hash, align_ts))
Expand Down Expand Up @@ -733,7 +776,8 @@ impl RangeSelectStream {
for i in 0..self.range_exec.len() {
let args = self.evaluate_many(&batch, &self.range_exec[i].args)?;
// use self.modify_map record (hash, align_ts) => [row_nums]
align_to_calendar(
produce_align_time(
self.align_to,
self.range_exec[i].range,
self.align,
ts_column_ref,
Expand Down Expand Up @@ -1065,6 +1109,7 @@ mod test {
},
],
align,
align_to: 0,
by: vec![Arc::new(Column::new("host", 2))],
time_index: TIME_INDEX_COLUMN.to_string(),
schema: schema.clone(),
Expand Down
Loading
Loading