Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Add additional docstrings to Window function implementations #6592

Merged
merged 6 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
//! # Examples
//!
//! The main entry point for interacting with DataFusion is the
//! [`SessionContext`].
//! [`SessionContext`]. [`Expr`]s represent expressions such as `a + b`.
//!
//! [`SessionContext`]: execution::context::SessionContext
//!
Expand Down
4 changes: 3 additions & 1 deletion datafusion/expr/src/window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ impl fmt::Display for WindowFunction {
}
}

/// An aggregate function that is part of a built-in window function
/// A [window function] built in to DataFusion
///
/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumIter)]
pub enum BuiltInWindowFunction {
/// number of the current row within its partition, counting from 1
Expand Down
9 changes: 3 additions & 6 deletions datafusion/physical-expr/src/window/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,9 @@ use crate::{
expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr,
};

/// A window expr that takes the form of an aggregate function
/// Aggregate Window Expressions that have the form
/// `OVER({ROWS | RANGE| GROUPS} BETWEEN UNBOUNDED PRECEDING AND ...)`
/// e.g cumulative window frames uses `PlainAggregateWindowExpr`. Where as Aggregate Window Expressions
/// that have the form `OVER({ROWS | RANGE| GROUPS} BETWEEN M {PRECEDING| FOLLOWING} AND ...)`
/// e.g sliding window frames uses `SlidingAggregateWindowExpr`.
/// A window expr that takes the form of an aggregate function.
///
/// See comments on [`WindowExpr`] for more details.
#[derive(Debug)]
pub struct PlainAggregateWindowExpr {
aggregate: Arc<dyn AggregateExpr>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/window/built_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use datafusion_common::utils::evaluate_partition_ranges;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::WindowFrame;

/// A window expr that takes the form of a built in window function
/// A window expr that takes the form of a [`BuiltInWindowFunctionExpr`].
#[derive(Debug)]
pub struct BuiltInWindowExpr {
expr: Arc<dyn BuiltInWindowFunctionExpr>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,24 @@ use datafusion_common::Result;
use std::any::Any;
use std::sync::Arc;

/// A window expression that is a built-in window function.
/// Evaluates a window function by instantiating a
/// `[PartitionEvaluator]` for calculating the function's output in
/// that partition.
///
/// Note that unlike aggregation based window functions, built-in window functions normally ignore
/// window frame spec, with the exception of first_value, last_value, and nth_value.
/// Note that unlike aggregation based window functions, some window
/// functions such as `rank` ignore the values in the window frame,
/// but others such as `first_value`, `last_value`, and
/// `nth_value` need the value.
#[allow(rustdoc::private_intra_doc_links)]
pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
/// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// the field of the final result of this aggregation.
/// The field of the final result of evaluating this window function.
fn field(&self) -> Result<Field>;

/// expressions that are passed to the Accumulator.
/// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
/// Expressions that are passed to the [`PartitionEvaluator`].
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;

/// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
Expand All @@ -46,8 +50,10 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
"BuiltInWindowFunctionExpr: default name"
}

/// Evaluate window function arguments against the batch and return
/// an array ref. Typically, the resulting vector is a single element vector.
/// Evaluate window function's arguments against the input window
/// batch and return an [`ArrayRef`].
///
/// Typically, the resulting vector is a single element vector.
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
self.expressions()
.iter()
Expand All @@ -56,19 +62,36 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
.collect()
}

/// Create built-in window evaluator with a batch
/// Create a [`PartitionEvaluator`] for evaluating the function on
/// a particular partition.
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>>;

/// Construct Reverse Expression that produces the same result
/// on a reversed window. For example `lead(10)` --> `lag(10)`
/// Construct a new [`BuiltInWindowFunctionExpr`] that produces
/// the same result as this function on a window with reverse
/// order. The return value of this function is used by the
/// DataFusion optimizer to avoid resorting the data when
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use re-sorting instead of resorting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in d56a5b2

/// possible.
///
/// Returns `None` (the default) if no reverse is known (or possible).
///
/// For example, the reverse of `lead(10)` is `lag(10)`.
fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
None
}

/// Can the window function be incrementally computed using
/// bounded memory?
///
/// If this function returns true, [`Self::create_evaluator`] must
/// implement [`PartitionEvaluator::evaluate_stateful`]
fn supports_bounded_execution(&self) -> bool {
false
}

/// Does the window function use the values from its window frame?
///
/// If this function returns true, [`Self::create_evaluator`] must
/// implement [`PartitionEvaluator::evaluate_inside_range`]
fn uses_window_frame(&self) -> bool {
false
}
Expand Down
145 changes: 129 additions & 16 deletions datafusion/physical-expr/src/window/partition_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! partition evaluation module
//! Partition evaluation module

use crate::window::window_expr::BuiltinWindowState;
use crate::window::WindowAggState;
Expand All @@ -25,24 +25,97 @@ use datafusion_common::{DataFusionError, ScalarValue};
use std::fmt::Debug;
use std::ops::Range;

/// Partition evaluator
/// Partition evaluator for Window Functions
///
/// # Background
///
/// An implementation of this trait is created and used for each
/// partition defined by an `OVER` clause and is instantiated by
/// [`BuiltInWindowFunctionExpr::create_evaluator`]
///
/// For example, evaluating `window_func(val) OVER (PARTITION BY col)`
/// on the following data:
///
/// ```text
/// col | val
/// --- + ----
/// A | 10
/// A | 10
/// C | 20
/// D | 30
/// D | 30
/// ```
///
/// Will instantiate three `PartitionEvaluator`s, one each for the
/// partitions defined by `col=A`, `col=B`, and `col=C`.
///
/// ```text
/// col | val
/// --- + ----
/// A | 10 <--- partition 1
/// A | 10
///
/// col | val
/// --- + ----
/// C | 20 <--- partition 2
///
/// col | val
/// --- + ----
/// D | 30 <--- partition 3
/// D | 30
/// ```
///
/// Different methods on this trait will be called depending on the
/// capabilities described by [`BuiltInWindowFunctionExpr`]:
///
/// # Stateless `PartitionEvaluator`
///
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mustafasrepo / @ozankabak if you have time to help me describe more clearly what Stateful and Stateless PartitionEvaluators are , and specifically what is different between them, I would be most appreciative

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some builtin window functions use window frame information inside the window expression (those are FIRST_VALUE, LAST_VALUE, NTH_VALUE). However, for most of the window functions what is in the window frame is not important (those are ROW_NUMBER, RANK, DENSE_RANK, PERCENT_RANK, CUME_DIST, LEAD, LAG). For the ones, using window_frame PartitionEvaluator::evaluate_inside_range is called. For the ones that do not use window frame PartitionEvaluator::evaluate is called (For rank calculations, PartitionEvaluator::evaluate_with_rank is called since its API is quite different. However, it doesn't use window frame either.)

PartitionEvaluator::evaluate_stateful is used only when we produce window result with bounded memory(When window functions are called from the BoundedWindowAggExec). In this case window results are calculated in running fashion, hence we need to store previous state, to be able to calculate correct output (For instance, for ROW_NUMBER function the current batch evaluator receive may not be the first batch. Hence we cannot start row_number from 0, we need to start from last ROW_NUMBER produced for the previous batches received. Similarly, we need to store some information in the state. When we do not receive whole table as a single batch)

Currently, we have support for bounded(stateful) execution for FIRST_VALUE, LAST_VALUE, NTH_VALUE, ROW_NUMBER, RANK, DENSE_RANK, LEAD, LAG.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mustafasrepo -- this is super helpful. I am incorporating this information into this PR

/// In this case, [`Self::evaluate`], [`Self::evaluate_with_rank`] or
/// [`Self::evaluate_inside_range`] is called with values for the
/// entire partition.
///
/// # Stateful `PartitionEvaluator`
///
/// In this case, [`Self::evaluate_stateful`] is called to calculate
/// the results of the window function incrementally for each new
/// batch, saving and restoring any state needed to do so as
/// [`BuiltinWindowState`].
///
/// For example, when computing `ROW_NUMBER` incrementally,
/// [`Self::evaluate_stateful`] will be called multiple times with
/// different batches. For all batches after the first, the output
/// `row_number` must start from last `row_number` produced for the
/// previous batch. The previous row number is saved and restored as
/// the state.
///
/// [`BuiltInWindowFunctionExpr`]: crate::window::BuiltInWindowFunctionExpr
/// [`BuiltInWindowFunctionExpr::create_evaluator`]: crate::window::BuiltInWindowFunctionExpr::create_evaluator
pub trait PartitionEvaluator: Debug + Send {
/// Whether the evaluator should be evaluated with rank
/// Can this evaluator be evaluated with (only) rank
///
/// If `include_rank` is true, then [`Self::evaluate_with_rank`]
/// will be called for each partition, which includes the
/// `rank`.
fn include_rank(&self) -> bool {
false
}

/// Returns state of the Built-in Window Function
/// Returns the internal state of the window function
///
/// Only used for stateful evaluation
fn state(&self) -> Result<BuiltinWindowState> {
// If we do not use state we just return Default
Ok(BuiltinWindowState::Default)
}

/// Updates the internal state for Built-in window function
// state is useful to update internal state for Built-in window function.
// idx is the index of last row for which result is calculated.
// range_columns is the result of order by column values. It is used to calculate rank boundaries
// sort_partition_points is the boundaries of each rank in the range_column. It is used to update rank.
/// Updates the internal state for window function
///
/// Only used for stateful evaluation
///
/// `state`: is useful to update internal state for window function.
/// `idx`: is the index of last row for which result is calculated.
/// `range_columns`: is the result of order by column values. It is used to calculate rank boundaries
/// `sort_partition_points`: is the boundaries of each rank in the range_column. It is used to update rank.
fn update_state(
&mut self,
_state: &WindowAggState,
Expand All @@ -54,36 +127,72 @@ pub trait PartitionEvaluator: Debug + Send {
Ok(())
}

/// Sets the internal state for window function
///
/// Only used for stateful evaluation
fn set_state(&mut self, _state: &BuiltinWindowState) -> Result<()> {
Err(DataFusionError::NotImplemented(
"set_state is not implemented for this window function".to_string(),
))
}

/// Gets the range where Built-in window function result is calculated.
// idx is the index of last row for which result is calculated.
// n_rows is the number of rows of the input record batch (Used during bound check)
/// Gets the range where the window function result is calculated.
///
/// `idx`: is the index of last row for which result is calculated.
/// `n_rows`: is the number of rows of the input record batch (Used during bounds check)
fn get_range(&self, _idx: usize, _n_rows: usize) -> Result<Range<usize>> {
Err(DataFusionError::NotImplemented(
"get_range is not implemented for this window function".to_string(),
))
}

/// Evaluate the partition evaluator against the partition
/// Called for window functions that *do not use* values from the
/// the window frame, such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`,
/// `PERCENT_RANK`, `CUME_DIST`, `LEAD`, `LAG`).
fn evaluate(&self, _values: &[ArrayRef], _num_rows: usize) -> Result<ArrayRef> {
Err(DataFusionError::NotImplemented(
"evaluate is not implemented by default".into(),
))
}

/// Evaluate window function result inside given range
/// Evaluate window function result inside given range.
///
/// Only used for stateful evaluation
fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result<ScalarValue> {
Err(DataFusionError::NotImplemented(
"evaluate_stateful is not implemented by default".into(),
))
}

/// evaluate the partition evaluator against the partition but with rank
/// [`PartitionEvaluator::evaluate_with_rank`] is called for window
/// functions that only need the rank of a row within its window
/// frame.
///
/// Evaluate the partition evaluator against the partition using
/// the row ranks. For example, `RANK(col)` produces
///
/// ```text
/// col | rank
/// --- + ----
/// A | 1
/// A | 1
/// C | 3
/// D | 4
/// D | 5
/// ```
///
/// For this case, `num_rows` would be `5` and the
/// `ranks_in_partition` would be called with
///
/// ```text
/// [
/// (0,1),
/// (2,2),
/// (3,4),
/// ]
/// ```
///
/// See [`Self::include_rank`] for more details
fn evaluate_with_rank(
&self,
_num_rows: usize,
Expand All @@ -94,7 +203,11 @@ pub trait PartitionEvaluator: Debug + Send {
))
}

/// evaluate window function result inside given range
/// Called for window functions that use values from window frame,
/// such as `FIRST_VALUE`, `LAST_VALUE`, `NTH_VALUE` and produce a
/// single value for every row in the partition.
///
/// Returns a [`ScalarValue`] that is the value of the window function for the entire partition
fn evaluate_inside_range(
&self,
_values: &[ArrayRef],
Expand Down
19 changes: 9 additions & 10 deletions datafusion/physical-expr/src/window/sliding_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@ use crate::{
expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr,
};

/// A window expr that takes the form of an aggregate function
/// Aggregate Window Expressions that have the form
/// `OVER({ROWS | RANGE| GROUPS} BETWEEN UNBOUNDED PRECEDING AND ...)`
/// e.g cumulative window frames uses `PlainAggregateWindowExpr`. Where as Aggregate Window Expressions
/// that have the form `OVER({ROWS | RANGE| GROUPS} BETWEEN M {PRECEDING| FOLLOWING} AND ...)`
/// e.g sliding window frames uses `SlidingAggregateWindowExpr`.
/// A window expr that takes the form of an aggregate function that
/// can be incrementally computed over sliding windows.
///
/// See comments on [`WindowExpr`] for more details.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consolidated this description into WindowExpr

#[derive(Debug)]
pub struct SlidingAggregateWindowExpr {
aggregate: Arc<dyn AggregateExpr>,
Expand Down Expand Up @@ -72,10 +70,11 @@ impl SlidingAggregateWindowExpr {
}
}

/// peer based evaluation based on the fact that batch is pre-sorted given the sort columns
/// and then per partition point we'll evaluate the peer group (e.g. SUM or MAX gives the same
/// results for peers) and concatenate the results.

/// Incrementally update window function using the fact that batch is
/// pre-sorted given the sort columns and then per partition point.
///
/// Evaluates the peer group (e.g. `SUM` or `MAX` gives the same results
/// for peers) and concatenate the results.
impl WindowExpr for SlidingAggregateWindowExpr {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
Expand Down
Loading