Skip to content

Commit

Permalink
refactor async function
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu committed Jun 6, 2021
1 parent f077602 commit 6a89e50
Showing 1 changed file with 25 additions and 19 deletions.
44 changes: 25 additions & 19 deletions datafusion/src/physical_plan/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,27 +434,9 @@ impl WindowAggStream {
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();
let schema_clone = schema.clone();
let input_schema = input.schema();
tokio::spawn(async move {
let schema = schema_clone.clone();
let result = common::collect(input)
.await
.map_err(DataFusionError::into_arrow_external_error)
.and_then(move |batches| {
let batch = common::combine_batches(&batches, input_schema.clone())?;
if let Some(batch) = batch {
// calculate window cols
let mut columns = compute_window_aggregates(window_expr, &batch)
.map_err(DataFusionError::into_arrow_external_error)?;
// combine with the original cols
// note the setup of window aggregates is that they newly calculated window
// expressions are always prepended to the columns
columns.extend_from_slice(batch.columns());
RecordBatch::try_new(schema, columns)
} else {
Ok(RecordBatch::new_empty(schema))
}
});
let result = WindowAggStream::process(input, window_expr, schema).await;
tx.send(result)
});

Expand All @@ -464,6 +446,30 @@ impl WindowAggStream {
schema,
}
}

async fn process(
input: SendableRecordBatchStream,
window_expr: Vec<Arc<dyn WindowExpr>>,
schema: SchemaRef,
) -> ArrowResult<RecordBatch> {
let input_schema = input.schema();
let batches = common::collect(input)
.await
.map_err(DataFusionError::into_arrow_external_error)?;
let batch = common::combine_batches(&batches, input_schema.clone())?;
if let Some(batch) = batch {
// calculate window cols
let mut columns = compute_window_aggregates(window_expr, &batch)
.map_err(DataFusionError::into_arrow_external_error)?;
// combine with the original cols
// note the setup of window aggregates is that they newly calculated window
// expressions are always prepended to the columns
columns.extend_from_slice(batch.columns());
RecordBatch::try_new(schema, columns)
} else {
Ok(RecordBatch::new_empty(schema))
}
}
}

impl Stream for WindowAggStream {
Expand Down

0 comments on commit 6a89e50

Please sign in to comment.