Skip to content

Commit

Permalink
move built in window expr and partition evaluator (#1865)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist authored Feb 20, 2022
1 parent 72baa9d commit 0b1bef1
Show file tree
Hide file tree
Showing 15 changed files with 187 additions and 126 deletions.
3 changes: 1 addition & 2 deletions datafusion-physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
mod aggregate_expr;
mod physical_expr;
mod sort_expr;
mod window_expr;
pub mod window;

pub use aggregate_expr::AggregateExpr;
pub use physical_expr::PhysicalExpr;
pub use sort_expr::PhysicalSortExpr;
pub use window_expr::WindowExpr;
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use super::partition_evaluator::PartitionEvaluator;
use crate::PhysicalExpr;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use std::any::Any;
use std::sync::Arc;

/// A window expression that is a built-in window function.
///
/// Note that unlike aggregation based window functions, built-in window functions normally ignore
/// window frame spec, with the exception of first_value, last_value, and nth_value.
pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
/// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// the field of the final result of this aggregation.
fn field(&self) -> Result<Field>;

/// expressions that are passed to the Accumulator.
/// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;

/// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
/// implementation returns placeholder text.
fn name(&self) -> &str {
"BuiltInWindowFunctionExpr: default name"
}

/// Create built-in window evaluator with a batch
fn create_evaluator(
&self,
batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>>;
}
25 changes: 25 additions & 0 deletions datafusion-physical-expr/src/window/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod built_in_window_function_expr;
mod partition_evaluator;
mod window_expr;

pub use built_in_window_function_expr::BuiltInWindowFunctionExpr;
pub use partition_evaluator::find_ranges_in_range;
pub use partition_evaluator::PartitionEvaluator;
pub use window_expr::WindowExpr;
84 changes: 84 additions & 0 deletions datafusion-physical-expr/src/window/partition_evaluator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::ArrayRef;
use datafusion_common::DataFusionError;
use datafusion_common::Result;
use std::ops::Range;

/// Given a partition range, and the full list of sort partition points, given that the sort
/// partition points are sorted using [partition columns..., order columns...], the split
/// boundaries would align (what's sorted on [partition columns...] would definitely be sorted
/// on finer columns), so this will use binary search to find ranges that are within the
/// partition range and return the valid slice.
pub fn find_ranges_in_range<'a>(
partition_range: &Range<usize>,
sort_partition_points: &'a [Range<usize>],
) -> &'a [Range<usize>] {
let start_idx = sort_partition_points
.partition_point(|sort_range| sort_range.start < partition_range.start);
let end_idx = start_idx
+ sort_partition_points[start_idx..]
.partition_point(|sort_range| sort_range.end <= partition_range.end);
&sort_partition_points[start_idx..end_idx]
}

/// Partition evaluator
pub trait PartitionEvaluator {
/// Whether the evaluator should be evaluated with rank
fn include_rank(&self) -> bool {
false
}

/// evaluate the partition evaluator against the partitions
fn evaluate(&self, partition_points: Vec<Range<usize>>) -> Result<Vec<ArrayRef>> {
partition_points
.into_iter()
.map(|partition| self.evaluate_partition(partition))
.collect()
}

/// evaluate the partition evaluator against the partitions with rank information
fn evaluate_with_rank(
&self,
partition_points: Vec<Range<usize>>,
sort_partition_points: Vec<Range<usize>>,
) -> Result<Vec<ArrayRef>> {
partition_points
.into_iter()
.map(|partition| {
let ranks_in_partition =
find_ranges_in_range(&partition, &sort_partition_points);
self.evaluate_partition_with_rank(partition, ranks_in_partition)
})
.collect()
}

/// evaluate the partition evaluator against the partition
fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef>;

/// evaluate the partition evaluator against the partition but with rank
fn evaluate_partition_with_rank(
&self,
_partition: Range<usize>,
_ranks_in_partition: &[Range<usize>],
) -> Result<ArrayRef> {
Err(DataFusionError::NotImplemented(
"evaluate_partition_with_rank is not implemented by default".into(),
))
}
}
File renamed without changes.
5 changes: 3 additions & 2 deletions datafusion/src/physical_plan/expressions/cume_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
//! at runtime during query execution

use crate::error::Result;
use crate::physical_plan::window_functions::PartitionEvaluator;
use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
use crate::physical_plan::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::array::Float64Array;
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::window::BuiltInWindowFunctionExpr;
use datafusion_physical_expr::window::PartitionEvaluator;
use std::any::Any;
use std::iter;
use std::ops::Range;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/physical_plan/expressions/lead_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
//! at runtime during query execution

use crate::error::{DataFusionError, Result};
use crate::physical_plan::window_functions::PartitionEvaluator;
use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
use crate::physical_plan::PhysicalExpr;
use crate::scalar::ScalarValue;
use arrow::array::ArrayRef;
use arrow::compute::cast;
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::window::BuiltInWindowFunctionExpr;
use datafusion_physical_expr::window::PartitionEvaluator;
use std::any::Any;
use std::ops::Neg;
use std::ops::Range;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/physical_plan/expressions/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
//! that can evaluated at runtime during query execution

use crate::error::{DataFusionError, Result};
use crate::physical_plan::window_functions::PartitionEvaluator;
use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
use crate::physical_plan::PhysicalExpr;
use crate::scalar::ScalarValue;
use arrow::array::{new_null_array, ArrayRef};
use arrow::compute::kernels::window::shift;
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::window::BuiltInWindowFunctionExpr;
use datafusion_physical_expr::window::PartitionEvaluator;
use std::any::Any;
use std::iter;
use std::ops::Range;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/physical_plan/expressions/rank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
//! at runtime during query execution

use crate::error::Result;
use crate::physical_plan::window_functions::PartitionEvaluator;
use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
use crate::physical_plan::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::array::{Float64Array, UInt64Array};
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::window::BuiltInWindowFunctionExpr;
use datafusion_physical_expr::window::PartitionEvaluator;
use std::any::Any;
use std::iter;
use std::ops::Range;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/physical_plan/expressions/row_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
//! Defines physical expression for `row_number` that can evaluated at runtime during query execution

use crate::error::Result;
use crate::physical_plan::window_functions::PartitionEvaluator;
use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
use crate::physical_plan::PhysicalExpr;
use arrow::array::{ArrayRef, UInt64Array};
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::window::BuiltInWindowFunctionExpr;
use datafusion_physical_expr::window::PartitionEvaluator;
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,8 @@ pub enum Distribution {
HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
}

pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, WindowExpr};
pub use datafusion_physical_expr::window::WindowExpr;
pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};

/// Applies an optional projection to a [`SchemaRef`], returning the
/// projected schema
Expand Down
88 changes: 2 additions & 86 deletions datafusion/src/physical_plan/window_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,11 @@
//!
//! see also <https://www.postgresql.org/docs/current/functions-window.html>

use crate::error::{DataFusionError, Result};
use crate::error::Result;
use crate::physical_plan::functions::{TypeSignature, Volatility};
use crate::physical_plan::{
aggregates, functions::Signature, type_coercion::data_types,
windows::find_ranges_in_range, PhysicalExpr,
};
use arrow::array::ArrayRef;
use crate::physical_plan::{aggregates, functions::Signature, type_coercion::data_types};
use arrow::datatypes::DataType;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatch;
pub use datafusion_expr::{BuiltInWindowFunction, WindowFunction};
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;

/// Returns the datatype of the window function
pub fn return_type(
Expand Down Expand Up @@ -112,81 +103,6 @@ pub(super) fn signature_for_built_in(fun: &BuiltInWindowFunction) -> Signature {
}
}

/// Partition evaluator
pub(crate) trait PartitionEvaluator {
/// Whether the evaluator should be evaluated with rank
fn include_rank(&self) -> bool {
false
}

/// evaluate the partition evaluator against the partitions
fn evaluate(&self, partition_points: Vec<Range<usize>>) -> Result<Vec<ArrayRef>> {
partition_points
.into_iter()
.map(|partition| self.evaluate_partition(partition))
.collect()
}

/// evaluate the partition evaluator against the partitions with rank information
fn evaluate_with_rank(
&self,
partition_points: Vec<Range<usize>>,
sort_partition_points: Vec<Range<usize>>,
) -> Result<Vec<ArrayRef>> {
partition_points
.into_iter()
.map(|partition| {
let ranks_in_partition =
find_ranges_in_range(&partition, &sort_partition_points);
self.evaluate_partition_with_rank(partition, ranks_in_partition)
})
.collect()
}

/// evaluate the partition evaluator against the partition
fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef>;

/// evaluate the partition evaluator against the partition but with rank
fn evaluate_partition_with_rank(
&self,
_partition: Range<usize>,
_ranks_in_partition: &[Range<usize>],
) -> Result<ArrayRef> {
Err(DataFusionError::NotImplemented(
"evaluate_partition_with_rank is not implemented by default".into(),
))
}
}

/// A window expression that is a built-in window function.
///
/// Note that unlike aggregation based window functions, built-in window functions normally ignore
/// window frame spec, with the exception of first_value, last_value, and nth_value.
pub(crate) trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
/// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// the field of the final result of this aggregation.
fn field(&self) -> Result<Field>;

/// expressions that are passed to the Accumulator.
/// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;

/// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
/// implementation returns placeholder text.
fn name(&self) -> &str {
"BuiltInWindowFunctionExpr: default name"
}

/// Create built-in window evaluator with a batch
fn create_evaluator(
&self,
batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>>;
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/windows/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
//! Physical exec for aggregate window function expressions.

use crate::error::{DataFusionError, Result};
use crate::physical_plan::windows::find_ranges_in_range;
use crate::physical_plan::{
expressions::PhysicalSortExpr, Accumulator, AggregateExpr, PhysicalExpr, WindowExpr,
};
use arrow::compute::concat;
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, datatypes::Field};
use datafusion_expr::{WindowFrame, WindowFrameUnits};
use datafusion_physical_expr::window::find_ranges_in_range;
use std::any::Any;
use std::iter::IntoIterator;
use std::ops::Range;
Expand Down
Loading

0 comments on commit 0b1bef1

Please sign in to comment.