Skip to content

Commit

Permalink
Support aggregation in rolling window queries
Browse files Browse the repository at this point in the history
The idea is to aggregate inside each matching partition and dimension.
`ROLLING_WINDOW` clause now has an optional `GROUP BY DIMENSION <expr>`
argument. Corresponding expression is used both as a grouping key for
non-rolling aggregates and a "join" key to match to the rolling window
output dimension.
  • Loading branch information
ilya-biryukov committed Aug 27, 2021
1 parent 605c921 commit 5f73024
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 73 deletions.
2 changes: 1 addition & 1 deletion datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ahash = "0.7"
hashbrown = "0.11"
arrow = { git = "https://github.com/cube-js/arrow-rs.git", branch = "cube", features = ["prettyprint"] }
parquet = { git = "https://github.com/cube-js/arrow-rs.git", branch = "cube", features = ["arrow"] }
sqlparser = { git = "https://github.com/cube-js/sqlparser-rs.git", rev = "6008dfab082a3455c54b023be878d92ec9acef43" }
sqlparser = { git = "https://github.com/cube-js/sqlparser-rs.git", rev = "2fcd06f7354e8c85f170b49a08fc018749289a40" }
paste = "^1.0"
num_cpus = "1.13.0"
chrono = "0.4"
Expand Down
156 changes: 141 additions & 15 deletions datafusion/src/cube_ext/rolling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,28 @@ use crate::logical_plan::{
};
use crate::physical_plan::coalesce_batches::concat_batches;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::hash_aggregate::{append_value, create_builder};
use crate::physical_plan::group_scalar::GroupByScalar;
use crate::physical_plan::hash_aggregate::{
append_value, create_accumulators, create_builder, create_group_by_value,
};
use crate::physical_plan::planner::ExtensionPlanner;
use crate::physical_plan::sort::SortExec;
use crate::physical_plan::{
collect, AggregateExpr, ColumnarValue, Distribution, ExecutionPlan, Partitioning,
PhysicalPlanner, SendableRecordBatchStream,
PhysicalExpr, PhysicalPlanner, SendableRecordBatchStream,
};
use crate::scalar::ScalarValue;
use arrow::array::{make_array, BooleanBuilder, MutableArrayData};
use arrow::array::{make_array, ArrayRef, BooleanBuilder, MutableArrayData, UInt64Array};
use arrow::compute::filter;
use arrow::datatypes::{DataType, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use chrono::{TimeZone, Utc};
use hashbrown::HashMap;
use itertools::Itertools;
use std::any::Any;
use std::cmp::{max, Ordering};
use std::convert::TryFrom;
use std::sync::Arc;

#[derive(Debug)]
Expand All @@ -55,6 +60,8 @@ pub struct RollingWindowAggregate {
pub every: Expr,
pub partition_by: Vec<Column>,
pub rolling_aggs: Vec<Expr>,
pub group_by_dimension: Option<Expr>,
pub aggs: Vec<Expr>,
}

impl UserDefinedLogicalNode for RollingWindowAggregate {
Expand All @@ -79,6 +86,10 @@ impl UserDefinedLogicalNode for RollingWindowAggregate {
];
e.extend(self.partition_by.iter().map(|c| Expr::Column(c.clone())));
e.extend_from_slice(self.rolling_aggs.as_slice());
e.extend_from_slice(self.aggs.as_slice());
if let Some(d) = &self.group_by_dimension {
e.push(d.clone());
}
e
}

Expand All @@ -96,7 +107,13 @@ impl UserDefinedLogicalNode for RollingWindowAggregate {
inputs: &[LogicalPlan],
) -> Arc<dyn UserDefinedLogicalNode + Send + Sync> {
assert_eq!(inputs.len(), 1);
assert!(4 + self.partition_by.len() <= exprs.len());
assert_eq!(
exprs.len(),
4 + self.partition_by.len()
+ self.rolling_aggs.len()
+ self.aggs.len()
+ self.group_by_dimension.as_ref().map(|_| 1).unwrap_or(0)
);
let input = inputs[0].clone();
let dimension = match &exprs[0] {
Expr::Column(c) => c.clone(),
Expand All @@ -105,14 +122,30 @@ impl UserDefinedLogicalNode for RollingWindowAggregate {
let from = exprs[1].clone();
let to = exprs[2].clone();
let every = exprs[3].clone();
let partition_by = exprs[4..4 + self.partition_by.len()]
let exprs = &exprs[4..];
let partition_by = exprs[..self.partition_by.len()]
.iter()
.map(|c| match c {
Expr::Column(c) => c.clone(),
o => panic!("Expected column for partition_by, got {:?}", o),
})
.collect_vec();
let rolling_aggs = exprs[4 + self.partition_by.len()..].to_vec();
let exprs = &exprs[self.partition_by.len()..];

let rolling_aggs = exprs[..self.rolling_aggs.len()].to_vec();
let exprs = &exprs[self.rolling_aggs.len()..];

let aggs = exprs[..self.aggs.len()].to_vec();
let exprs = &exprs[self.aggs.len()..];

let group_by_dimension = if self.group_by_dimension.is_some() {
debug_assert_eq!(exprs.len(), 1);
Some(exprs[0].clone())
} else {
debug_assert_eq!(exprs.len(), 0);
None
};

Arc::new(RollingWindowAggregate {
schema: self.schema.clone(),
input,
Expand All @@ -122,6 +155,8 @@ impl UserDefinedLogicalNode for RollingWindowAggregate {
every,
partition_by,
rolling_aggs,
group_by_dimension,
aggs,
})
}
}
Expand Down Expand Up @@ -211,6 +246,21 @@ impl ExtensionPlanner for Planner {
})
.collect::<Result<Vec<_>, _>>()?;

let group_by_dimension = node
.group_by_dimension
.as_ref()
.map(|d| {
planner.create_physical_expr(d, input_dfschema, &input_schema, ctx_state)
})
.transpose()?;
let aggs = node
.aggs
.iter()
.map(|a| {
planner.create_aggregate_expr(a, input_dfschema, &input_schema, ctx_state)
})
.collect::<Result<_, _>>()?;

// TODO: filter inputs by date.
// Do preliminary sorting.
let mut sort_key = Vec::with_capacity(input_schema.fields().len());
Expand All @@ -229,6 +279,7 @@ impl ExtensionPlanner for Planner {
});

let sort = Arc::new(SortExec::try_new(sort_key, input.clone())?);

let schema = node.schema.to_schema_ref();

Ok(Some(Arc::new(RollingWindowAggExec {
Expand All @@ -237,6 +288,8 @@ impl ExtensionPlanner for Planner {
group_key,
rolling_aggs,
dimension,
group_by_dimension,
aggs,
from,
to,
every,
Expand Down Expand Up @@ -297,6 +350,8 @@ pub struct RollingWindowAggExec {
pub group_key: Vec<crate::physical_plan::expressions::Column>,
pub rolling_aggs: Vec<RollingAgg>,
pub dimension: crate::physical_plan::expressions::Column,
pub group_by_dimension: Option<Arc<dyn PhysicalExpr>>,
pub aggs: Vec<Arc<dyn AggregateExpr>>,
pub from: ScalarValue,
pub to: ScalarValue,
pub every: ScalarValue,
Expand Down Expand Up @@ -335,6 +390,8 @@ impl ExecutionPlan for RollingWindowAggExec {
group_key: self.group_key.clone(),
rolling_aggs: self.rolling_aggs.clone(),
dimension: self.dimension.clone(),
group_by_dimension: self.group_by_dimension.clone(),
aggs: self.aggs.clone(),
from: self.from.clone(),
to: self.to.clone(),
every: self.every.clone(),
Expand All @@ -357,6 +414,7 @@ impl ExecutionPlan for RollingWindowAggExec {
.iter()
.map(|c| input.columns()[c.index()].clone())
.collect_vec();

let other_cols = input
.columns()
.iter()
Expand All @@ -374,15 +432,7 @@ impl ExecutionPlan for RollingWindowAggExec {
let agg_inputs = self
.rolling_aggs
.iter()
.map(|r| {
r.agg
.expressions()
.iter()
.map(|e| -> Result<_, DataFusionError> {
Ok(e.evaluate(&input)?.into_array(num_rows))
})
.collect::<Result<Vec<_>, _>>()
})
.map(|r| compute_agg_inputs(r.agg.as_ref(), &input))
.collect::<Result<Vec<_>, _>>()?;
let mut accumulators = self
.rolling_aggs
Expand All @@ -396,6 +446,19 @@ impl ExecutionPlan for RollingWindowAggExec {
dimension = arrow::compute::cast(&dimension, &dim_iter_type)?;
}

let extra_aggs_dimension = self
.group_by_dimension
.as_ref()
.map(|d| -> Result<_, DataFusionError> {
Ok(d.evaluate(&input)?.into_array(num_rows))
})
.transpose()?;
let extra_aggs_inputs = self
.aggs
.iter()
.map(|a| compute_agg_inputs(a.as_ref(), &input))
.collect::<Result<Vec<_>, _>>()?;

let mut out_dim = create_builder(&self.from);
let mut out_keys = key_cols
.iter()
Expand All @@ -404,6 +467,12 @@ impl ExecutionPlan for RollingWindowAggExec {
let mut out_aggs = Vec::with_capacity(self.rolling_aggs.len());
// This filter must be applied prior to returning the values.
let mut out_aggs_keep = BooleanBuilder::new(0);
let extra_agg_nulls = self
.aggs
.iter()
.map(|a| ScalarValue::try_from(a.field()?.data_type()))
.collect::<Result<Vec<_>, _>>()?;
let mut out_extra_aggs = extra_agg_nulls.iter().map(create_builder).collect_vec();
let mut out_other = other_cols
.iter()
.map(|c| MutableArrayData::new(vec![c.data()], true, 0))
Expand Down Expand Up @@ -491,6 +560,32 @@ impl ExecutionPlan for RollingWindowAggExec {
}
}

// Compute non-rolling aggregates for the group.
let mut dim_to_extra_aggs = HashMap::new();
if let Some(key) = &extra_aggs_dimension {
let mut key_to_rows = HashMap::new();
for i in group_start..group_end {
let key = create_group_by_value(key, i)?;
key_to_rows.entry(key).or_insert(Vec::new()).push(i as u64);
}

for (k, rows) in key_to_rows {
let mut accumulators = create_accumulators(&self.aggs)?;
let rows = UInt64Array::from(rows);
let mut values = Vec::with_capacity(accumulators.len());
for i in 0..accumulators.len() {
let accum_inputs = extra_aggs_inputs[i]
.iter()
.map(|a| arrow::compute::take(a.as_ref(), &rows, None))
.collect::<Result<Vec<_>, _>>()?;
accumulators[i].update_batch(&accum_inputs)?;
values.push(accumulators[i].evaluate()?);
}

dim_to_extra_aggs.insert(k, values);
}
}

// Add keys, dimension and non-aggregate columns to the output.
let mut d = self.from.clone();
let mut d_iter = 0;
Expand All @@ -509,6 +604,19 @@ impl ExecutionPlan for RollingWindowAggExec {
for i in 0..key_cols.len() {
out_keys[i].extend(0, group_start, group_start + 1)
}
// Add aggregates.
match dim_to_extra_aggs.get(&GroupByScalar::try_from(&d)?) {
Some(aggs) => {
for i in 0..out_extra_aggs.len() {
append_value(out_extra_aggs[i].as_mut(), &aggs[i])?
}
}
None => {
for i in 0..out_extra_aggs.len() {
append_value(out_extra_aggs[i].as_mut(), &extra_agg_nulls[i])?
}
}
}
// Find the matching row to add other columns.
while matching_row_lower_bound < group_end
&& cmp_same_types(
Expand Down Expand Up @@ -590,10 +698,16 @@ impl ExecutionPlan for RollingWindowAggExec {
for o in out_other {
r.push(make_array(o.freeze()));
}

let out_aggs_keep = out_aggs_keep.finish();
for mut a in out_aggs {
r.push(filter(a.finish().as_ref(), &out_aggs_keep)?);
}

for mut a in out_extra_aggs {
r.push(a.finish())
}

let r = RecordBatch::try_new(self.schema(), r)?;
Ok(Box::pin(StreamWithSchema::wrap(
self.schema(),
Expand Down Expand Up @@ -621,6 +735,18 @@ fn add_dim(l: &ScalarValue, r: &ScalarValue) -> ScalarValue {
}
}

fn compute_agg_inputs(
a: &dyn AggregateExpr,
input: &RecordBatch,
) -> Result<Vec<ArrayRef>, DataFusionError> {
a.expressions()
.iter()
.map(|e| -> Result<_, DataFusionError> {
Ok(e.evaluate(input)?.into_array(input.num_rows()))
})
.collect()
}

fn meets_lower_bound(
value: &ScalarValue,
current: &ScalarValue,
Expand Down
Loading

0 comments on commit 5f73024

Please sign in to comment.