Skip to content

Commit

Permalink
Minor: Add additional docstrings to Window function implementations (#…
Browse files Browse the repository at this point in the history
…6592)

* Add additional docstrings to Window function implementations

* Update docs

* updates

* fix doc link

* Change resorting --> re-sorting
  • Loading branch information
alamb authored Jun 9, 2023
1 parent 95c4681 commit 5032060
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 50 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
//! # Examples
//!
//! The main entry point for interacting with DataFusion is the
//! [`SessionContext`].
//! [`SessionContext`]. [`Expr`]s represent expressions such as `a + b`.
//!
//! [`SessionContext`]: execution::context::SessionContext
//!
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl ExternalSorter {
//
// The factor of 2 aims to avoid a degenerate case where the
// memory required for `fetch` is just under the memory available,
// causing repeated resorting of data
// causing repeated re-sorting of data
if self.reservation.size() > before / 2
|| self.reservation.try_grow(size).is_err()
{
Expand Down
4 changes: 3 additions & 1 deletion datafusion/expr/src/window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ impl fmt::Display for WindowFunction {
}
}

/// An aggregate function that is part of a built-in window function
/// A [window function] built in to DataFusion
///
/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumIter)]
pub enum BuiltInWindowFunction {
/// number of the current row within its partition, counting from 1
Expand Down
9 changes: 3 additions & 6 deletions datafusion/physical-expr/src/window/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,9 @@ use crate::{
expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr,
};

/// A window expr that takes the form of an aggregate function
/// Aggregate Window Expressions that have the form
/// `OVER({ROWS | RANGE| GROUPS} BETWEEN UNBOUNDED PRECEDING AND ...)`
/// e.g cumulative window frames uses `PlainAggregateWindowExpr`. Where as Aggregate Window Expressions
/// that have the form `OVER({ROWS | RANGE| GROUPS} BETWEEN M {PRECEDING| FOLLOWING} AND ...)`
/// e.g sliding window frames uses `SlidingAggregateWindowExpr`.
/// A window expr that takes the form of an aggregate function.
///
/// See comments on [`WindowExpr`] for more details.
#[derive(Debug)]
pub struct PlainAggregateWindowExpr {
aggregate: Arc<dyn AggregateExpr>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/window/built_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use datafusion_common::utils::evaluate_partition_ranges;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::WindowFrame;

/// A window expr that takes the form of a built in window function
/// A window expr that takes the form of a [`BuiltInWindowFunctionExpr`].
#[derive(Debug)]
pub struct BuiltInWindowExpr {
expr: Arc<dyn BuiltInWindowFunctionExpr>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,24 @@ use datafusion_common::Result;
use std::any::Any;
use std::sync::Arc;

/// A window expression that is a built-in window function.
/// Evaluates a window function by instantiating a
/// `[PartitionEvaluator]` for calculating the function's output in
/// that partition.
///
/// Note that unlike aggregation based window functions, built-in window functions normally ignore
/// window frame spec, with the exception of first_value, last_value, and nth_value.
/// Note that unlike aggregation based window functions, some window
/// functions such as `rank` ignore the values in the window frame,
/// but others such as `first_value`, `last_value`, and
/// `nth_value` need the value.
#[allow(rustdoc::private_intra_doc_links)]
pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
/// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// the field of the final result of this aggregation.
/// The field of the final result of evaluating this window function.
fn field(&self) -> Result<Field>;

/// expressions that are passed to the Accumulator.
/// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
/// Expressions that are passed to the [`PartitionEvaluator`].
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;

/// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
Expand All @@ -46,8 +50,10 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
"BuiltInWindowFunctionExpr: default name"
}

/// Evaluate window function arguments against the batch and return
/// an array ref. Typically, the resulting vector is a single element vector.
/// Evaluate window function's arguments against the input window
/// batch and return an [`ArrayRef`].
///
/// Typically, the resulting vector is a single element vector.
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
self.expressions()
.iter()
Expand All @@ -56,19 +62,36 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
.collect()
}

/// Create built-in window evaluator with a batch
/// Create a [`PartitionEvaluator`] for evaluating the function on
/// a particular partition.
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>>;

/// Construct Reverse Expression that produces the same result
/// on a reversed window. For example `lead(10)` --> `lag(10)`
/// Construct a new [`BuiltInWindowFunctionExpr`] that produces
/// the same result as this function on a window with reverse
/// order. The return value of this function is used by the
/// DataFusion optimizer to avoid re-sorting the data when
/// possible.
///
/// Returns `None` (the default) if no reverse is known (or possible).
///
/// For example, the reverse of `lead(10)` is `lag(10)`.
fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
None
}

/// Can the window function be incrementally computed using
/// bounded memory?
///
/// If this function returns true, [`Self::create_evaluator`] must
/// implement [`PartitionEvaluator::evaluate_stateful`]
fn supports_bounded_execution(&self) -> bool {
false
}

/// Does the window function use the values from its window frame?
///
/// If this function returns true, [`Self::create_evaluator`] must
/// implement [`PartitionEvaluator::evaluate_inside_range`]
fn uses_window_frame(&self) -> bool {
false
}
Expand Down
145 changes: 129 additions & 16 deletions datafusion/physical-expr/src/window/partition_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! partition evaluation module
//! Partition evaluation module
use crate::window::window_expr::BuiltinWindowState;
use crate::window::WindowAggState;
Expand All @@ -25,24 +25,97 @@ use datafusion_common::{DataFusionError, ScalarValue};
use std::fmt::Debug;
use std::ops::Range;

/// Partition evaluator
/// Partition evaluator for Window Functions
///
/// # Background
///
/// An implementation of this trait is created and used for each
/// partition defined by an `OVER` clause and is instantiated by
/// [`BuiltInWindowFunctionExpr::create_evaluator`]
///
/// For example, evaluating `window_func(val) OVER (PARTITION BY col)`
/// on the following data:
///
/// ```text
/// col | val
/// --- + ----
/// A | 10
/// A | 10
/// C | 20
/// D | 30
/// D | 30
/// ```
///
/// Will instantiate three `PartitionEvaluator`s, one each for the
/// partitions defined by `col=A`, `col=B`, and `col=C`.
///
/// ```text
/// col | val
/// --- + ----
/// A | 10 <--- partition 1
/// A | 10
///
/// col | val
/// --- + ----
/// C | 20 <--- partition 2
///
/// col | val
/// --- + ----
/// D | 30 <--- partition 3
/// D | 30
/// ```
///
/// Different methods on this trait will be called depending on the
/// capabilities described by [`BuiltInWindowFunctionExpr`]:
///
/// # Stateless `PartitionEvaluator`
///
/// In this case, [`Self::evaluate`], [`Self::evaluate_with_rank`] or
/// [`Self::evaluate_inside_range`] is called with values for the
/// entire partition.
///
/// # Stateful `PartitionEvaluator`
///
/// In this case, [`Self::evaluate_stateful`] is called to calculate
/// the results of the window function incrementally for each new
/// batch, saving and restoring any state needed to do so as
/// [`BuiltinWindowState`].
///
/// For example, when computing `ROW_NUMBER` incrementally,
/// [`Self::evaluate_stateful`] will be called multiple times with
/// different batches. For all batches after the first, the output
/// `row_number` must start from last `row_number` produced for the
/// previous batch. The previous row number is saved and restored as
/// the state.
///
/// [`BuiltInWindowFunctionExpr`]: crate::window::BuiltInWindowFunctionExpr
/// [`BuiltInWindowFunctionExpr::create_evaluator`]: crate::window::BuiltInWindowFunctionExpr::create_evaluator
pub trait PartitionEvaluator: Debug + Send {
/// Whether the evaluator should be evaluated with rank
/// Can this evaluator be evaluated with (only) rank
///
/// If `include_rank` is true, then [`Self::evaluate_with_rank`]
/// will be called for each partition, which includes the
/// `rank`.
fn include_rank(&self) -> bool {
false
}

/// Returns state of the Built-in Window Function
/// Returns the internal state of the window function
///
/// Only used for stateful evaluation
fn state(&self) -> Result<BuiltinWindowState> {
// If we do not use state we just return Default
Ok(BuiltinWindowState::Default)
}

/// Updates the internal state for Built-in window function
// state is useful to update internal state for Built-in window function.
// idx is the index of last row for which result is calculated.
// range_columns is the result of order by column values. It is used to calculate rank boundaries
// sort_partition_points is the boundaries of each rank in the range_column. It is used to update rank.
/// Updates the internal state for window function
///
/// Only used for stateful evaluation
///
/// `state`: is useful to update internal state for window function.
/// `idx`: is the index of last row for which result is calculated.
/// `range_columns`: is the result of order by column values. It is used to calculate rank boundaries
/// `sort_partition_points`: is the boundaries of each rank in the range_column. It is used to update rank.
fn update_state(
&mut self,
_state: &WindowAggState,
Expand All @@ -54,36 +127,72 @@ pub trait PartitionEvaluator: Debug + Send {
Ok(())
}

/// Sets the internal state for window function
///
/// Only used for stateful evaluation
fn set_state(&mut self, _state: &BuiltinWindowState) -> Result<()> {
Err(DataFusionError::NotImplemented(
"set_state is not implemented for this window function".to_string(),
))
}

/// Gets the range where Built-in window function result is calculated.
// idx is the index of last row for which result is calculated.
// n_rows is the number of rows of the input record batch (Used during bound check)
/// Gets the range where the window function result is calculated.
///
/// `idx`: is the index of last row for which result is calculated.
/// `n_rows`: is the number of rows of the input record batch (Used during bounds check)
fn get_range(&self, _idx: usize, _n_rows: usize) -> Result<Range<usize>> {
Err(DataFusionError::NotImplemented(
"get_range is not implemented for this window function".to_string(),
))
}

/// Evaluate the partition evaluator against the partition
/// Called for window functions that *do not use* values from the
/// the window frame, such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`,
/// `PERCENT_RANK`, `CUME_DIST`, `LEAD`, `LAG`).
fn evaluate(&self, _values: &[ArrayRef], _num_rows: usize) -> Result<ArrayRef> {
Err(DataFusionError::NotImplemented(
"evaluate is not implemented by default".into(),
))
}

/// Evaluate window function result inside given range
/// Evaluate window function result inside given range.
///
/// Only used for stateful evaluation
fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result<ScalarValue> {
Err(DataFusionError::NotImplemented(
"evaluate_stateful is not implemented by default".into(),
))
}

/// evaluate the partition evaluator against the partition but with rank
/// [`PartitionEvaluator::evaluate_with_rank`] is called for window
/// functions that only need the rank of a row within its window
/// frame.
///
/// Evaluate the partition evaluator against the partition using
/// the row ranks. For example, `RANK(col)` produces
///
/// ```text
/// col | rank
/// --- + ----
/// A | 1
/// A | 1
/// C | 3
/// D | 4
/// D | 5
/// ```
///
/// For this case, `num_rows` would be `5` and the
/// `ranks_in_partition` would be called with
///
/// ```text
/// [
/// (0,1),
/// (2,2),
/// (3,4),
/// ]
/// ```
///
/// See [`Self::include_rank`] for more details
fn evaluate_with_rank(
&self,
_num_rows: usize,
Expand All @@ -94,7 +203,11 @@ pub trait PartitionEvaluator: Debug + Send {
))
}

/// evaluate window function result inside given range
/// Called for window functions that use values from window frame,
/// such as `FIRST_VALUE`, `LAST_VALUE`, `NTH_VALUE` and produce a
/// single value for every row in the partition.
///
/// Returns a [`ScalarValue`] that is the value of the window function for the entire partition
fn evaluate_inside_range(
&self,
_values: &[ArrayRef],
Expand Down
19 changes: 9 additions & 10 deletions datafusion/physical-expr/src/window/sliding_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@ use crate::{
expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr,
};

/// A window expr that takes the form of an aggregate function
/// Aggregate Window Expressions that have the form
/// `OVER({ROWS | RANGE| GROUPS} BETWEEN UNBOUNDED PRECEDING AND ...)`
/// e.g cumulative window frames uses `PlainAggregateWindowExpr`. Where as Aggregate Window Expressions
/// that have the form `OVER({ROWS | RANGE| GROUPS} BETWEEN M {PRECEDING| FOLLOWING} AND ...)`
/// e.g sliding window frames uses `SlidingAggregateWindowExpr`.
/// A window expr that takes the form of an aggregate function that
/// can be incrementally computed over sliding windows.
///
/// See comments on [`WindowExpr`] for more details.
#[derive(Debug)]
pub struct SlidingAggregateWindowExpr {
aggregate: Arc<dyn AggregateExpr>,
Expand Down Expand Up @@ -72,10 +70,11 @@ impl SlidingAggregateWindowExpr {
}
}

/// peer based evaluation based on the fact that batch is pre-sorted given the sort columns
/// and then per partition point we'll evaluate the peer group (e.g. SUM or MAX gives the same
/// results for peers) and concatenate the results.
/// Incrementally update window function using the fact that batch is
/// pre-sorted given the sort columns and then per partition point.
///
/// Evaluates the peer group (e.g. `SUM` or `MAX` gives the same results
/// for peers) and concatenate the results.
impl WindowExpr for SlidingAggregateWindowExpr {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
Expand Down
Loading

0 comments on commit 5032060

Please sign in to comment.