From 6a89e50cb7f8d132f619fe5d003b3e0f9c168c19 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sun, 6 Jun 2021 15:57:19 +0800 Subject: [PATCH] refactor async function --- datafusion/src/physical_plan/windows.rs | 44 ++++++++++++++----------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index 700629a7db765..7eb14943facf1 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -434,27 +434,9 @@ impl WindowAggStream { ) -> Self { let (tx, rx) = futures::channel::oneshot::channel(); let schema_clone = schema.clone(); - let input_schema = input.schema(); tokio::spawn(async move { let schema = schema_clone.clone(); - let result = common::collect(input) - .await - .map_err(DataFusionError::into_arrow_external_error) - .and_then(move |batches| { - let batch = common::combine_batches(&batches, input_schema.clone())?; - if let Some(batch) = batch { - // calculate window cols - let mut columns = compute_window_aggregates(window_expr, &batch) - .map_err(DataFusionError::into_arrow_external_error)?; - // combine with the original cols - // note the setup of window aggregates is that they newly calculated window - // expressions are always prepended to the columns - columns.extend_from_slice(batch.columns()); - RecordBatch::try_new(schema, columns) - } else { - Ok(RecordBatch::new_empty(schema)) - } - }); + let result = WindowAggStream::process(input, window_expr, schema).await; tx.send(result) }); @@ -464,6 +446,30 @@ impl WindowAggStream { schema, } } + + async fn process( + input: SendableRecordBatchStream, + window_expr: Vec>, + schema: SchemaRef, + ) -> ArrowResult { + let input_schema = input.schema(); + let batches = common::collect(input) + .await + .map_err(DataFusionError::into_arrow_external_error)?; + let batch = common::combine_batches(&batches, input_schema.clone())?; + if let Some(batch) = batch { + // calculate window cols + let mut columns = compute_window_aggregates(window_expr, &batch) + .map_err(DataFusionError::into_arrow_external_error)?; + // combine with the original cols + // note the setup of window aggregates is that they newly calculated window + // expressions are always prepended to the columns + columns.extend_from_slice(batch.columns()); + RecordBatch::try_new(schema, columns) + } else { + Ok(RecordBatch::new_empty(schema)) + } + } } impl Stream for WindowAggStream {