From 0a861a76bde0bb43e5561f1cf1ef14fd64e0c08b Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Thu, 20 May 2021 22:28:34 +0800 Subject: [PATCH] save stream --- .../src/physical_plan/hash_aggregate.rs | 2 +- datafusion/src/physical_plan/windows.rs | 41 ++++++++++++++++++- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 0a822dc898afb..8441dcdddc211 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -825,7 +825,7 @@ fn aggregate_expressions( } pin_project! { - struct HashAggregateStream { + pub struct HashAggregateStream { schema: SchemaRef, #[pin] output: futures::channel::oneshot::Receiver>, diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index cac24fabc96b2..7bdd9cb3b0aee 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -168,7 +168,7 @@ impl ExecutionPlan for WindowAggExec { 1 => Ok(Arc::new(WindowAggExec::try_new( self.window_expr.clone(), children[0].clone(), - children[0].schema(), + self.input_schema.clone(), )?)), _ => Err(DataFusionError::Internal( "WindowAggExec wrong number of children".to_owned(), @@ -178,6 +178,7 @@ impl ExecutionPlan for WindowAggExec { async fn execute(&self, partition: usize) -> Result { if 0 != partition { + dbg!("execute", partition); return Err(DataFusionError::Internal(format!( "WindowAggExec invalid partition {}", partition @@ -186,15 +187,51 @@ impl ExecutionPlan for WindowAggExec { // window needs to operate on a single partition currently if 1 != self.input.output_partitioning().partition_count() { + dbg!(self.input.output_partitioning().partition_count()); return Err(DataFusionError::Internal( "WindowAggExec requires a single input partition".to_owned(), )); } - // let input = self.input.execute(0).await?; + let input = self.input.execute(partition).await?; + + + Err(DataFusionError::NotImplemented( "WindowAggExec::execute".to_owned(), )) } } + +// struct WindowAggStream { +// scheme: SchemaRef, +// window_expr: Vec>, +// input: SendableRecordBatchStream, +// } + +// impl Stream for WindowAggStream { +// type Item = ArrowResult; + +// fn poll_next( +// mut self: Pin<&mut Self>, +// cx: &mut Context<'_>, +// ) -> Poll> { +// self.input.poll_next_unpin(cx).map(|x| match x { +// Some(Ok(batch)) => Some(batch_project(&batch, &self.expr, &self.schema)), +// other => other, +// }) +// } + +// fn size_hint(&self) -> (usize, Option) { +// // same number of record batches +// self.input.size_hint() +// } +// } + +// impl RecordBatchStream for WindowAggStream { +// /// Get the schema +// fn schema(&self) -> SchemaRef { +// self.schema.clone() +// } +// }