Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support aggregation in rolling window queries #4

Merged
merged 1 commit into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ahash = "0.7"
hashbrown = "0.11"
arrow = { git = "https://github.com/cube-js/arrow-rs.git", branch = "cube", features = ["prettyprint"] }
parquet = { git = "https://github.com/cube-js/arrow-rs.git", branch = "cube", features = ["arrow"] }
sqlparser = { git = "https://github.com/cube-js/sqlparser-rs.git", rev = "6008dfab082a3455c54b023be878d92ec9acef43" }
sqlparser = { git = "https://github.com/cube-js/sqlparser-rs.git", rev = "2fcd06f7354e8c85f170b49a08fc018749289a40" }
paste = "^1.0"
num_cpus = "1.13.0"
chrono = "0.4"
Expand Down
156 changes: 141 additions & 15 deletions datafusion/src/cube_ext/rolling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,28 @@ use crate::logical_plan::{
};
use crate::physical_plan::coalesce_batches::concat_batches;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::hash_aggregate::{append_value, create_builder};
use crate::physical_plan::group_scalar::GroupByScalar;
use crate::physical_plan::hash_aggregate::{
append_value, create_accumulators, create_builder, create_group_by_value,
};
use crate::physical_plan::planner::ExtensionPlanner;
use crate::physical_plan::sort::SortExec;
use crate::physical_plan::{
collect, AggregateExpr, ColumnarValue, Distribution, ExecutionPlan, Partitioning,
PhysicalPlanner, SendableRecordBatchStream,
PhysicalExpr, PhysicalPlanner, SendableRecordBatchStream,
};
use crate::scalar::ScalarValue;
use arrow::array::{make_array, BooleanBuilder, MutableArrayData};
use arrow::array::{make_array, ArrayRef, BooleanBuilder, MutableArrayData, UInt64Array};
use arrow::compute::filter;
use arrow::datatypes::{DataType, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use chrono::{TimeZone, Utc};
use hashbrown::HashMap;
use itertools::Itertools;
use std::any::Any;
use std::cmp::{max, Ordering};
use std::convert::TryFrom;
use std::sync::Arc;

#[derive(Debug)]
Expand All @@ -55,6 +60,8 @@ pub struct RollingWindowAggregate {
pub every: Expr,
pub partition_by: Vec<Column>,
pub rolling_aggs: Vec<Expr>,
pub group_by_dimension: Option<Expr>,
pub aggs: Vec<Expr>,
}

impl UserDefinedLogicalNode for RollingWindowAggregate {
Expand All @@ -79,6 +86,10 @@ impl UserDefinedLogicalNode for RollingWindowAggregate {
];
e.extend(self.partition_by.iter().map(|c| Expr::Column(c.clone())));
e.extend_from_slice(self.rolling_aggs.as_slice());
e.extend_from_slice(self.aggs.as_slice());
if let Some(d) = &self.group_by_dimension {
e.push(d.clone());
}
e
}

Expand All @@ -96,7 +107,13 @@ impl UserDefinedLogicalNode for RollingWindowAggregate {
inputs: &[LogicalPlan],
) -> Arc<dyn UserDefinedLogicalNode + Send + Sync> {
assert_eq!(inputs.len(), 1);
assert!(4 + self.partition_by.len() <= exprs.len());
assert_eq!(
exprs.len(),
4 + self.partition_by.len()
+ self.rolling_aggs.len()
+ self.aggs.len()
+ self.group_by_dimension.as_ref().map(|_| 1).unwrap_or(0)
);
let input = inputs[0].clone();
let dimension = match &exprs[0] {
Expr::Column(c) => c.clone(),
Expand All @@ -105,14 +122,30 @@ impl UserDefinedLogicalNode for RollingWindowAggregate {
let from = exprs[1].clone();
let to = exprs[2].clone();
let every = exprs[3].clone();
let partition_by = exprs[4..4 + self.partition_by.len()]
let exprs = &exprs[4..];
let partition_by = exprs[..self.partition_by.len()]
.iter()
.map(|c| match c {
Expr::Column(c) => c.clone(),
o => panic!("Expected column for partition_by, got {:?}", o),
})
.collect_vec();
let rolling_aggs = exprs[4 + self.partition_by.len()..].to_vec();
let exprs = &exprs[self.partition_by.len()..];

let rolling_aggs = exprs[..self.rolling_aggs.len()].to_vec();
let exprs = &exprs[self.rolling_aggs.len()..];

let aggs = exprs[..self.aggs.len()].to_vec();
let exprs = &exprs[self.aggs.len()..];

let group_by_dimension = if self.group_by_dimension.is_some() {
debug_assert_eq!(exprs.len(), 1);
Some(exprs[0].clone())
} else {
debug_assert_eq!(exprs.len(), 0);
None
};

Arc::new(RollingWindowAggregate {
schema: self.schema.clone(),
input,
Expand All @@ -122,6 +155,8 @@ impl UserDefinedLogicalNode for RollingWindowAggregate {
every,
partition_by,
rolling_aggs,
group_by_dimension,
aggs,
})
}
}
Expand Down Expand Up @@ -211,6 +246,21 @@ impl ExtensionPlanner for Planner {
})
.collect::<Result<Vec<_>, _>>()?;

let group_by_dimension = node
.group_by_dimension
.as_ref()
.map(|d| {
planner.create_physical_expr(d, input_dfschema, &input_schema, ctx_state)
})
.transpose()?;
let aggs = node
.aggs
.iter()
.map(|a| {
planner.create_aggregate_expr(a, input_dfschema, &input_schema, ctx_state)
})
.collect::<Result<_, _>>()?;

// TODO: filter inputs by date.
// Do preliminary sorting.
let mut sort_key = Vec::with_capacity(input_schema.fields().len());
Expand All @@ -229,6 +279,7 @@ impl ExtensionPlanner for Planner {
});

let sort = Arc::new(SortExec::try_new(sort_key, input.clone())?);

let schema = node.schema.to_schema_ref();

Ok(Some(Arc::new(RollingWindowAggExec {
Expand All @@ -237,6 +288,8 @@ impl ExtensionPlanner for Planner {
group_key,
rolling_aggs,
dimension,
group_by_dimension,
aggs,
from,
to,
every,
Expand Down Expand Up @@ -297,6 +350,8 @@ pub struct RollingWindowAggExec {
pub group_key: Vec<crate::physical_plan::expressions::Column>,
pub rolling_aggs: Vec<RollingAgg>,
pub dimension: crate::physical_plan::expressions::Column,
pub group_by_dimension: Option<Arc<dyn PhysicalExpr>>,
pub aggs: Vec<Arc<dyn AggregateExpr>>,
pub from: ScalarValue,
pub to: ScalarValue,
pub every: ScalarValue,
Expand Down Expand Up @@ -335,6 +390,8 @@ impl ExecutionPlan for RollingWindowAggExec {
group_key: self.group_key.clone(),
rolling_aggs: self.rolling_aggs.clone(),
dimension: self.dimension.clone(),
group_by_dimension: self.group_by_dimension.clone(),
aggs: self.aggs.clone(),
from: self.from.clone(),
to: self.to.clone(),
every: self.every.clone(),
Expand All @@ -357,6 +414,7 @@ impl ExecutionPlan for RollingWindowAggExec {
.iter()
.map(|c| input.columns()[c.index()].clone())
.collect_vec();

let other_cols = input
.columns()
.iter()
Expand All @@ -374,15 +432,7 @@ impl ExecutionPlan for RollingWindowAggExec {
let agg_inputs = self
.rolling_aggs
.iter()
.map(|r| {
r.agg
.expressions()
.iter()
.map(|e| -> Result<_, DataFusionError> {
Ok(e.evaluate(&input)?.into_array(num_rows))
})
.collect::<Result<Vec<_>, _>>()
})
.map(|r| compute_agg_inputs(r.agg.as_ref(), &input))
.collect::<Result<Vec<_>, _>>()?;
let mut accumulators = self
.rolling_aggs
Expand All @@ -396,6 +446,19 @@ impl ExecutionPlan for RollingWindowAggExec {
dimension = arrow::compute::cast(&dimension, &dim_iter_type)?;
}

let extra_aggs_dimension = self
.group_by_dimension
.as_ref()
.map(|d| -> Result<_, DataFusionError> {
Ok(d.evaluate(&input)?.into_array(num_rows))
})
.transpose()?;
let extra_aggs_inputs = self
.aggs
.iter()
.map(|a| compute_agg_inputs(a.as_ref(), &input))
.collect::<Result<Vec<_>, _>>()?;

let mut out_dim = create_builder(&self.from);
let mut out_keys = key_cols
.iter()
Expand All @@ -404,6 +467,12 @@ impl ExecutionPlan for RollingWindowAggExec {
let mut out_aggs = Vec::with_capacity(self.rolling_aggs.len());
// This filter must be applied prior to returning the values.
let mut out_aggs_keep = BooleanBuilder::new(0);
let extra_agg_nulls = self
.aggs
.iter()
.map(|a| ScalarValue::try_from(a.field()?.data_type()))
.collect::<Result<Vec<_>, _>>()?;
let mut out_extra_aggs = extra_agg_nulls.iter().map(create_builder).collect_vec();
let mut out_other = other_cols
.iter()
.map(|c| MutableArrayData::new(vec![c.data()], true, 0))
Expand Down Expand Up @@ -491,6 +560,32 @@ impl ExecutionPlan for RollingWindowAggExec {
}
}

// Compute non-rolling aggregates for the group.
let mut dim_to_extra_aggs = HashMap::new();
if let Some(key) = &extra_aggs_dimension {
let mut key_to_rows = HashMap::new();
for i in group_start..group_end {
let key = create_group_by_value(key, i)?;
key_to_rows.entry(key).or_insert(Vec::new()).push(i as u64);
}

for (k, rows) in key_to_rows {
let mut accumulators = create_accumulators(&self.aggs)?;
let rows = UInt64Array::from(rows);
let mut values = Vec::with_capacity(accumulators.len());
for i in 0..accumulators.len() {
let accum_inputs = extra_aggs_inputs[i]
.iter()
.map(|a| arrow::compute::take(a.as_ref(), &rows, None))
.collect::<Result<Vec<_>, _>>()?;
accumulators[i].update_batch(&accum_inputs)?;
values.push(accumulators[i].evaluate()?);
}

dim_to_extra_aggs.insert(k, values);
}
}

// Add keys, dimension and non-aggregate columns to the output.
let mut d = self.from.clone();
let mut d_iter = 0;
Expand All @@ -509,6 +604,19 @@ impl ExecutionPlan for RollingWindowAggExec {
for i in 0..key_cols.len() {
out_keys[i].extend(0, group_start, group_start + 1)
}
// Add aggregates.
match dim_to_extra_aggs.get(&GroupByScalar::try_from(&d)?) {
Some(aggs) => {
for i in 0..out_extra_aggs.len() {
append_value(out_extra_aggs[i].as_mut(), &aggs[i])?
}
}
None => {
for i in 0..out_extra_aggs.len() {
append_value(out_extra_aggs[i].as_mut(), &extra_agg_nulls[i])?
}
}
}
// Find the matching row to add other columns.
while matching_row_lower_bound < group_end
&& cmp_same_types(
Expand Down Expand Up @@ -590,10 +698,16 @@ impl ExecutionPlan for RollingWindowAggExec {
for o in out_other {
r.push(make_array(o.freeze()));
}

let out_aggs_keep = out_aggs_keep.finish();
for mut a in out_aggs {
r.push(filter(a.finish().as_ref(), &out_aggs_keep)?);
}

for mut a in out_extra_aggs {
r.push(a.finish())
}

let r = RecordBatch::try_new(self.schema(), r)?;
Ok(Box::pin(StreamWithSchema::wrap(
self.schema(),
Expand Down Expand Up @@ -621,6 +735,18 @@ fn add_dim(l: &ScalarValue, r: &ScalarValue) -> ScalarValue {
}
}

fn compute_agg_inputs(
a: &dyn AggregateExpr,
input: &RecordBatch,
) -> Result<Vec<ArrayRef>, DataFusionError> {
a.expressions()
.iter()
.map(|e| -> Result<_, DataFusionError> {
Ok(e.evaluate(input)?.into_array(input.num_rows()))
})
.collect()
}

fn meets_lower_bound(
value: &ScalarValue,
current: &ScalarValue,
Expand Down
Loading