Skip to content

Commit

Permalink
save stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu committed May 21, 2021
1 parent a9121af commit 0a861a7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 3 deletions.
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ fn aggregate_expressions(
}

pin_project! {
struct HashAggregateStream {
pub struct HashAggregateStream {
schema: SchemaRef,
#[pin]
output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
Expand Down
41 changes: 39 additions & 2 deletions datafusion/src/physical_plan/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl ExecutionPlan for WindowAggExec {
1 => Ok(Arc::new(WindowAggExec::try_new(
self.window_expr.clone(),
children[0].clone(),
children[0].schema(),
self.input_schema.clone(),
)?)),
_ => Err(DataFusionError::Internal(
"WindowAggExec wrong number of children".to_owned(),
Expand All @@ -178,6 +178,7 @@ impl ExecutionPlan for WindowAggExec {

async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
if 0 != partition {
dbg!("execute", partition);
return Err(DataFusionError::Internal(format!(
"WindowAggExec invalid partition {}",
partition
Expand All @@ -186,15 +187,51 @@ impl ExecutionPlan for WindowAggExec {

// window needs to operate on a single partition currently
if 1 != self.input.output_partitioning().partition_count() {
dbg!(self.input.output_partitioning().partition_count());
return Err(DataFusionError::Internal(
"WindowAggExec requires a single input partition".to_owned(),
));
}

// let input = self.input.execute(0).await?;
let input = self.input.execute(partition).await?;




Err(DataFusionError::NotImplemented(
"WindowAggExec::execute".to_owned(),
))
}
}

// struct WindowAggStream {
// scheme: SchemaRef,
// window_expr: Vec<Arc<dyn WindowExpr>>,
// input: SendableRecordBatchStream,
// }

// impl Stream for WindowAggStream {
// type Item = ArrowResult<RecordBatch>;

// fn poll_next(
// mut self: Pin<&mut Self>,
// cx: &mut Context<'_>,
// ) -> Poll<Option<Self::Item>> {
// self.input.poll_next_unpin(cx).map(|x| match x {
// Some(Ok(batch)) => Some(batch_project(&batch, &self.expr, &self.schema)),
// other => other,
// })
// }

// fn size_hint(&self) -> (usize, Option<usize>) {
// // same number of record batches
// self.input.size_hint()
// }
// }

// impl RecordBatchStream for WindowAggStream {
// /// Get the schema
// fn schema(&self) -> SchemaRef {
// self.schema.clone()
// }
// }

0 comments on commit 0a861a7

Please sign in to comment.