diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs index 2a7aa5cced5c7..3607f29debba1 100644 --- a/datafusion/src/physical_plan/aggregates.rs +++ b/datafusion/src/physical_plan/aggregates.rs @@ -105,7 +105,7 @@ pub fn return_type(fun: &AggregateFunction, arg_types: &[DataType]) -> Result], diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index edb442a7c5959..ec8c3c6182008 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -32,6 +32,7 @@ use arrow::{ use async_trait::async_trait; use futures::stream::Stream; use futures::stream::StreamExt; +use futures::Future; use pin_project_lite::pin_project; use std::any::Any; use std::pin::Pin; @@ -271,19 +272,14 @@ impl Stream for WindowAggStream { *this.finished = true; // check for error in receiving channel and unwrap actual result let result = match result { - Err(e) => Err(ArrowError::ExternalError(Box::new(e))), // error receiving - Ok(result) => result, + Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving + Ok(result) => result.transpose(), }; - Poll::Ready(Some(result)) + Poll::Ready(result) } Poll::Pending => Poll::Pending, } } - - fn size_hint(&self) -> (usize, Option) { - // same number of record batches - self.input.size_hint() - } } impl RecordBatchStream for WindowAggStream {