Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu committed May 21, 2021
1 parent d2ce852 commit bf5b8a5
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 9 deletions.
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub fn return_type(fun: &AggregateFunction, arg_types: &[DataType]) -> Result<Da

/// Create a physical (function) expression.
/// This function errors when `args`' can't be coerced to a valid argument type of the function.
pub(super) fn create_aggregate_expr(
pub fn create_aggregate_expr(
fun: &AggregateFunction,
distinct: bool,
args: &[Arc<dyn PhysicalExpr>],
Expand Down
12 changes: 4 additions & 8 deletions datafusion/src/physical_plan/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use arrow::{
use async_trait::async_trait;
use futures::stream::Stream;
use futures::stream::StreamExt;
use futures::Future;
use pin_project_lite::pin_project;
use std::any::Any;
use std::pin::Pin;
Expand Down Expand Up @@ -271,19 +272,14 @@ impl Stream for WindowAggStream {
*this.finished = true;
// check for error in receiving channel and unwrap actual result
let result = match result {
Err(e) => Err(ArrowError::ExternalError(Box::new(e))), // error receiving
Ok(result) => result,
Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
Ok(result) => result.transpose(),
};
Poll::Ready(Some(result))
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
// same number of record batches
self.input.size_hint()
}
}

impl RecordBatchStream for WindowAggStream {
Expand Down

0 comments on commit bf5b8a5

Please sign in to comment.