Skip to content

Commit

Permalink
s
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu committed May 26, 2021
1 parent 3b3dad1 commit 26ca0fe
Showing 1 changed file with 0 additions and 235 deletions.
235 changes: 0 additions & 235 deletions datafusion/src/physical_plan/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,238 +666,3 @@ mod tests {
Ok(())
}
}

type WindowAccumulatorItem = Box<dyn WindowAccumulator>;

fn window_expressions(
window_expr: &[Arc<dyn WindowExpr>],
) -> Result<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
Ok(window_expr
.iter()
.map(|expr| expr.expressions())
.collect::<Vec<_>>())
}

fn window_aggregate_batch(
batch: &RecordBatch,
window_accumulators: &mut [WindowAccumulatorItem],
expressions: &[Vec<Arc<dyn PhysicalExpr>>],
) -> Result<Vec<Option<ArrayRef>>> {
// 1.1 iterate accumulators and respective expressions together
// 1.2 evaluate expressions
// 1.3 update / merge window accumulators with the expressions' values

// 1.1
window_accumulators
.iter_mut()
.zip(expressions)
.map(|(window_acc, expr)| {
// 1.2
let values = &expr
.iter()
.map(|e| e.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()?;

window_acc.scan_batch(batch.num_rows(), values)
})
.into_iter()
.collect::<Result<Vec<_>>>()
}

/// returns a vector of ArrayRefs, where each entry corresponds to one window expr
fn finalize_window_aggregation(
window_accumulators: &[WindowAccumulatorItem],
) -> Result<Vec<Option<ScalarValue>>> {
window_accumulators
.iter()
.map(|window_accumulator| window_accumulator.evaluate())
.collect::<Result<Vec<_>>>()
}

fn create_window_accumulators(
window_expr: &[Arc<dyn WindowExpr>],
) -> Result<Vec<WindowAccumulatorItem>> {
window_expr
.iter()
.map(|expr| expr.create_accumulator())
.collect::<Result<Vec<_>>>()
}

async fn compute_window_aggregate(
schema: SchemaRef,
window_expr: Vec<Arc<dyn WindowExpr>>,
mut input: SendableRecordBatchStream,
) -> ArrowResult<RecordBatch> {
let mut window_accumulators = create_window_accumulators(&window_expr)
.map_err(DataFusionError::into_arrow_external_error)?;

let expressions = window_expressions(&window_expr)
.map_err(DataFusionError::into_arrow_external_error)?;

let expressions = Arc::new(expressions);

// TODO each element shall have some size hint
let mut accumulator: Vec<Vec<ArrayRef>> =
iter::repeat(vec![]).take(window_expr.len()).collect();

let mut original_batches: Vec<RecordBatch> = vec![];

let mut total_num_rows = 0;

while let Some(batch) = input.next().await {
let batch = batch?;
total_num_rows += batch.num_rows();
original_batches.push(batch.clone());

let batch_aggregated =
window_aggregate_batch(&batch, &mut window_accumulators, &expressions)
.map_err(DataFusionError::into_arrow_external_error)?;
accumulator.iter_mut().zip(batch_aggregated).for_each(
|(acc_for_window, window_batch)| {
if let Some(data) = window_batch {
acc_for_window.push(data);
}
},
);
}

let aggregated_mapped = finalize_window_aggregation(&window_accumulators)
.map_err(DataFusionError::into_arrow_external_error)?;

let mut columns: Vec<ArrayRef> = accumulator
.iter()
.zip(aggregated_mapped)
.map(|(acc, agg)| {
Ok(match (acc, agg) {
(acc, Some(scalar_value)) if acc.is_empty() => {
scalar_value.to_array_of_size(total_num_rows)
}
(acc, None) if !acc.is_empty() => {
let vec_array: Vec<&dyn Array> =
acc.iter().map(|arc| arc.as_ref()).collect();
concat(&vec_array)?
}
_ => {
return Err(DataFusionError::Execution(
"Invalid window function behavior".to_owned(),
))
}
})
})
.collect::<Result<Vec<ArrayRef>>>()
.map_err(DataFusionError::into_arrow_external_error)?;

for i in 0..(schema.fields().len() - window_expr.len()) {
let col = concat(
&original_batches
.iter()
.map(|batch| batch.column(i).as_ref())
.collect::<Vec<_>>(),
)?;
columns.push(col);
}

RecordBatch::try_new(schema.clone(), columns)
}

impl WindowAggStream {
/// Create a new WindowAggStream
pub fn new(
schema: SchemaRef,
window_expr: Vec<Arc<dyn WindowExpr>>,
input: SendableRecordBatchStream,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();
let schema_clone = schema.clone();
tokio::spawn(async move {
let result = compute_window_aggregate(schema_clone, window_expr, input).await;
tx.send(result)
});

Self {
output: rx,
finished: false,
schema,
}
}
}

impl Stream for WindowAggStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.finished {
return Poll::Ready(None);
}

// is the output ready?
let this = self.project();
let output_poll = this.output.poll(cx);

match output_poll {
Poll::Ready(result) => {
*this.finished = true;
// check for error in receiving channel and unwrap actual result
let result = match result {
Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
Ok(result) => Some(result),
};
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}
}
}

impl RecordBatchStream for WindowAggStream {
/// Get the schema
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

#[cfg(test)]
mod tests {
// use super::*;

// /// some mock data to test windows
// fn some_data() -> (Arc<Schema>, Vec<RecordBatch>) {
// // define a schema.
// let schema = Arc::new(Schema::new(vec![
// Field::new("a", DataType::UInt32, false),
// Field::new("b", DataType::Float64, false),
// ]));

// // define data.
// (
// schema.clone(),
// vec![
// RecordBatch::try_new(
// schema.clone(),
// vec![
// Arc::new(UInt32Array::from(vec![2, 3, 4, 4])),
// Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
// ],
// )
// .unwrap(),
// RecordBatch::try_new(
// schema,
// vec![
// Arc::new(UInt32Array::from(vec![2, 3, 3, 4])),
// Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
// ],
// )
// .unwrap(),
// ],
// )
// }

// #[tokio::test]
// async fn window_function() -> Result<()> {
// let input: Arc<dyn ExecutionPlan> = unimplemented!();
// let input_schema = input.schema();
// let window_expr = vec![];
// WindowAggExec::try_new(window_expr, input, input_schema);
// }
}

0 comments on commit 26ca0fe

Please sign in to comment.