diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index a0ea083840509..8199c69c579ef 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -666,238 +666,3 @@ mod tests { Ok(()) } } - -type WindowAccumulatorItem = Box; - -fn window_expressions( - window_expr: &[Arc], -) -> Result>>> { - Ok(window_expr - .iter() - .map(|expr| expr.expressions()) - .collect::>()) -} - -fn window_aggregate_batch( - batch: &RecordBatch, - window_accumulators: &mut [WindowAccumulatorItem], - expressions: &[Vec>], -) -> Result>> { - // 1.1 iterate accumulators and respective expressions together - // 1.2 evaluate expressions - // 1.3 update / merge window accumulators with the expressions' values - - // 1.1 - window_accumulators - .iter_mut() - .zip(expressions) - .map(|(window_acc, expr)| { - // 1.2 - let values = &expr - .iter() - .map(|e| e.evaluate(batch)) - .map(|r| r.map(|v| v.into_array(batch.num_rows()))) - .collect::>>()?; - - window_acc.scan_batch(batch.num_rows(), values) - }) - .into_iter() - .collect::>>() -} - -/// returns a vector of ArrayRefs, where each entry corresponds to one window expr -fn finalize_window_aggregation( - window_accumulators: &[WindowAccumulatorItem], -) -> Result>> { - window_accumulators - .iter() - .map(|window_accumulator| window_accumulator.evaluate()) - .collect::>>() -} - -fn create_window_accumulators( - window_expr: &[Arc], -) -> Result> { - window_expr - .iter() - .map(|expr| expr.create_accumulator()) - .collect::>>() -} - -async fn compute_window_aggregate( - schema: SchemaRef, - window_expr: Vec>, - mut input: SendableRecordBatchStream, -) -> ArrowResult { - let mut window_accumulators = create_window_accumulators(&window_expr) - .map_err(DataFusionError::into_arrow_external_error)?; - - let expressions = window_expressions(&window_expr) - .map_err(DataFusionError::into_arrow_external_error)?; - - let expressions = Arc::new(expressions); - - // TODO each element shall have some size hint - let mut accumulator: Vec> = - iter::repeat(vec![]).take(window_expr.len()).collect(); - - let mut original_batches: Vec = vec![]; - - let mut total_num_rows = 0; - - while let Some(batch) = input.next().await { - let batch = batch?; - total_num_rows += batch.num_rows(); - original_batches.push(batch.clone()); - - let batch_aggregated = - window_aggregate_batch(&batch, &mut window_accumulators, &expressions) - .map_err(DataFusionError::into_arrow_external_error)?; - accumulator.iter_mut().zip(batch_aggregated).for_each( - |(acc_for_window, window_batch)| { - if let Some(data) = window_batch { - acc_for_window.push(data); - } - }, - ); - } - - let aggregated_mapped = finalize_window_aggregation(&window_accumulators) - .map_err(DataFusionError::into_arrow_external_error)?; - - let mut columns: Vec = accumulator - .iter() - .zip(aggregated_mapped) - .map(|(acc, agg)| { - Ok(match (acc, agg) { - (acc, Some(scalar_value)) if acc.is_empty() => { - scalar_value.to_array_of_size(total_num_rows) - } - (acc, None) if !acc.is_empty() => { - let vec_array: Vec<&dyn Array> = - acc.iter().map(|arc| arc.as_ref()).collect(); - concat(&vec_array)? - } - _ => { - return Err(DataFusionError::Execution( - "Invalid window function behavior".to_owned(), - )) - } - }) - }) - .collect::>>() - .map_err(DataFusionError::into_arrow_external_error)?; - - for i in 0..(schema.fields().len() - window_expr.len()) { - let col = concat( - &original_batches - .iter() - .map(|batch| batch.column(i).as_ref()) - .collect::>(), - )?; - columns.push(col); - } - - RecordBatch::try_new(schema.clone(), columns) -} - -impl WindowAggStream { - /// Create a new WindowAggStream - pub fn new( - schema: SchemaRef, - window_expr: Vec>, - input: SendableRecordBatchStream, - ) -> Self { - let (tx, rx) = futures::channel::oneshot::channel(); - let schema_clone = schema.clone(); - tokio::spawn(async move { - let result = compute_window_aggregate(schema_clone, window_expr, input).await; - tx.send(result) - }); - - Self { - output: rx, - finished: false, - schema, - } - } -} - -impl Stream for WindowAggStream { - type Item = ArrowResult; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.finished { - return Poll::Ready(None); - } - - // is the output ready? - let this = self.project(); - let output_poll = this.output.poll(cx); - - match output_poll { - Poll::Ready(result) => { - *this.finished = true; - // check for error in receiving channel and unwrap actual result - let result = match result { - Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving - Ok(result) => Some(result), - }; - Poll::Ready(result) - } - Poll::Pending => Poll::Pending, - } - } -} - -impl RecordBatchStream for WindowAggStream { - /// Get the schema - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - -#[cfg(test)] -mod tests { - // use super::*; - - // /// some mock data to test windows - // fn some_data() -> (Arc, Vec) { - // // define a schema. - // let schema = Arc::new(Schema::new(vec![ - // Field::new("a", DataType::UInt32, false), - // Field::new("b", DataType::Float64, false), - // ])); - - // // define data. - // ( - // schema.clone(), - // vec![ - // RecordBatch::try_new( - // schema.clone(), - // vec![ - // Arc::new(UInt32Array::from(vec![2, 3, 4, 4])), - // Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])), - // ], - // ) - // .unwrap(), - // RecordBatch::try_new( - // schema, - // vec![ - // Arc::new(UInt32Array::from(vec![2, 3, 3, 4])), - // Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])), - // ], - // ) - // .unwrap(), - // ], - // ) - // } - - // #[tokio::test] - // async fn window_function() -> Result<()> { - // let input: Arc = unimplemented!(); - // let input_schema = input.schema(); - // let window_expr = vec![]; - // WindowAggExec::try_new(window_expr, input, input_schema); - // } -}