Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Add additional docstrings to Window function implementations #6592

Merged
merged 6 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
//! # Examples
//!
//! The main entry point for interacting with DataFusion is the
//! [`SessionContext`].
//! [`SessionContext`]. [`Expr`]s represent expressions such as `a + b`.
//!
//! [`SessionContext`]: execution::context::SessionContext
//!
Expand Down
4 changes: 3 additions & 1 deletion datafusion/expr/src/window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ impl fmt::Display for WindowFunction {
}
}

/// An aggregate function that is part of a built-in window function
/// A [window function] built in to DataFusion
///
/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumIter)]
pub enum BuiltInWindowFunction {
/// number of the current row within its partition, counting from 1
Expand Down
9 changes: 3 additions & 6 deletions datafusion/physical-expr/src/window/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,9 @@ use crate::{
expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr,
};

/// A window expr that takes the form of an aggregate function
/// Aggregate Window Expressions that have the form
/// `OVER({ROWS | RANGE| GROUPS} BETWEEN UNBOUNDED PRECEDING AND ...)`
/// e.g cumulative window frames uses `PlainAggregateWindowExpr`. Where as Aggregate Window Expressions
/// that have the form `OVER({ROWS | RANGE| GROUPS} BETWEEN M {PRECEDING| FOLLOWING} AND ...)`
/// e.g sliding window frames uses `SlidingAggregateWindowExpr`.
/// A window expr that takes the form of an aggregate function.
///
/// See comments on [`WindowExpr`] for more details.
#[derive(Debug)]
pub struct PlainAggregateWindowExpr {
aggregate: Arc<dyn AggregateExpr>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/window/built_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use datafusion_common::utils::evaluate_partition_ranges;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::WindowFrame;

/// A window expr that takes the form of a built in window function
/// A window expr that takes the form of a [`BuiltInWindowFunctionExpr`].
#[derive(Debug)]
pub struct BuiltInWindowExpr {
expr: Arc<dyn BuiltInWindowFunctionExpr>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,21 @@ use datafusion_common::Result;
use std::any::Any;
use std::sync::Arc;

/// A window expression that is a built-in window function.
/// A window expression that evaluates a window function.
///
/// Note that unlike aggregation based window functions, window
/// functions normally ignore the window frame spec with the exception
/// of `first_value`, `last_value`, and `nth_value`.
///
/// Note that unlike aggregation based window functions, built-in window functions normally ignore
/// window frame spec, with the exception of first_value, last_value, and nth_value.
pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
/// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// the field of the final result of this aggregation.
/// The field of the final result of evaluating this window function.
fn field(&self) -> Result<Field>;

/// expressions that are passed to the Accumulator.
/// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
/// Expressions that are passed to the [`PartitionEvaluator`].
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;

/// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
Expand All @@ -46,8 +47,9 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
"BuiltInWindowFunctionExpr: default name"
}

/// Evaluate window function arguments against the batch and return
/// an array ref. Typically, the resulting vector is a single element vector.
/// Evaluate window function's arguments against the batch and
/// return an array ref. Typically, the resulting vector is a
/// single element vector.
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
self.expressions()
.iter()
Expand All @@ -56,11 +58,13 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
.collect()
}

/// Create built-in window evaluator with a batch
/// Create a [`PartitionEvaluator`] to evaluate data on a particular partition.
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>>;

/// Construct Reverse Expression that produces the same result
/// on a reversed window. For example `lead(10)` --> `lag(10)`
/// on a reversed window. Retuns `None` if not possible.
///
/// For example, the reverse of `lead(10)` is `lag(10)`.
fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
None
}
Expand All @@ -69,6 +73,8 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
false
}

/// If returns true, [`Self::create_evaluator`] must implement
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a little unclear on why use_window_frame was part of BuiltInWindowFunctionExpr and not PartitionEvaluator but I haven't looked into it in more detail

/// [`PartitionEvaluator::evaluate_inside_range`]
fn uses_window_frame(&self) -> bool {
false
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/window/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod window_frame_state;
pub use aggregate::PlainAggregateWindowExpr;
pub use built_in::BuiltInWindowExpr;
pub use built_in_window_function_expr::BuiltInWindowFunctionExpr;
pub use partition_evaluator::PartitionEvaluator;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitionEvaluator now appears in public docstrings (and I propose to make it part of the user defined window function API -- see #5781 (comment))

pub use sliding_aggregate::SlidingAggregateWindowExpr;
pub use window_expr::PartitionBatchState;
pub use window_expr::PartitionBatches;
Expand Down
70 changes: 60 additions & 10 deletions datafusion/physical-expr/src/window/partition_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,70 @@ use datafusion_common::{DataFusionError, ScalarValue};
use std::fmt::Debug;
use std::ops::Range;

/// Partition evaluator
/// Partition evaluator for Window Functions
///
/// An implementation of this trait is created and used for each
/// partition defined by the OVER clause.
///
/// For example, evaluating `window_func(val) OVER (PARTITION BY col)`
/// on the following data:
///
/// ```text
/// col | val
/// --- + ----
/// A | 1
/// A | 1
/// C | 2
/// D | 3
/// D | 3
/// ```
///
/// Will instantiate three `PartitionEvaluator`s, one each for the
/// partitions defined by `col=A`, `col=B`, and `col=C`.
///
/// There are two types of `PartitionEvaluator`:
///
/// # Stateless `PartitionEvaluator`
///
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mustafasrepo / @ozankabak if you have time to help me describe more clearly what Stateful and Stateless PartitionEvaluators are , and specifically what is different between them, I would be most appreciative

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some builtin window functions use window frame information inside the window expression (those are FIRST_VALUE, LAST_VALUE, NTH_VALUE). However, for most of the window functions what is in the window frame is not important (those are ROW_NUMBER, RANK, DENSE_RANK, PERCENT_RANK, CUME_DIST, LEAD, LAG). For the ones, using window_frame PartitionEvaluator::evaluate_inside_range is called. For the ones that do not use window frame PartitionEvaluator::evaluate is called (For rank calculations, PartitionEvaluator::evaluate_with_rank is called since its API is quite different. However, it doesn't use window frame either.)

PartitionEvaluator::evaluate_stateful is used only when we produce window result with bounded memory(When window functions are called from the BoundedWindowAggExec). In this case window results are calculated in running fashion, hence we need to store previous state, to be able to calculate correct output (For instance, for ROW_NUMBER function the current batch evaluator receive may not be the first batch. Hence we cannot start row_number from 0, we need to start from last ROW_NUMBER produced for the previous batches received. Similarly, we need to store some information in the state. When we do not receive whole table as a single batch)

Currently, we have support for bounded(stateful) execution for FIRST_VALUE, LAST_VALUE, NTH_VALUE, ROW_NUMBER, RANK, DENSE_RANK, LEAD, LAG.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mustafasrepo -- this is super helpful. I am incorporating this information into this PR

/// In this case, [`PartitionEvaluator::evaluate`] is called for the
/// entire partition / window function.
///
/// # Stateful `PartitionEvaluator`
///
/// This is used for XXXX. In this case YYYYY
///
pub trait PartitionEvaluator: Debug + Send {
/// Whether the evaluator should be evaluated with rank
///
/// If `include_rank` is true, then [`Self::evaluate_with_rank`]
/// will be called for each partition, which includes the
/// `rank`. For example:
///
/// ```text
/// col | rank
/// --- + ----
/// A | 1
/// A | 1
/// C | 2
/// D | 3
/// D | 3
/// ```
fn include_rank(&self) -> bool {
false
}

/// Returns state of the Built-in Window Function
/// Returns state of the Built-in Window Function (only used for stateful evaluation)
fn state(&self) -> Result<BuiltinWindowState> {
// If we do not use state we just return Default
Ok(BuiltinWindowState::Default)
}

/// Updates the internal state for Built-in window function
// state is useful to update internal state for Built-in window function.
// idx is the index of last row for which result is calculated.
// range_columns is the result of order by column values. It is used to calculate rank boundaries
// sort_partition_points is the boundaries of each rank in the range_column. It is used to update rank.
/// Updates the internal state for Built-in window function, if desired.
///
/// `state`: is useful to update internal state for Built-in window function.
/// `idx`: is the index of last row for which result is calculated.
/// `range_columns`: is the result of order by column values. It is used to calculate rank boundaries
/// `sort_partition_points`: is the boundaries of each rank in the range_column. It is used to update rank.
fn update_state(
&mut self,
_state: &WindowAggState,
Expand All @@ -54,15 +100,17 @@ pub trait PartitionEvaluator: Debug + Send {
Ok(())
}

/// Sets the internal state for Built-in window function, if supported
fn set_state(&mut self, _state: &BuiltinWindowState) -> Result<()> {
Err(DataFusionError::NotImplemented(
"set_state is not implemented for this window function".to_string(),
))
}

/// Gets the range where Built-in window function result is calculated.
// idx is the index of last row for which result is calculated.
// n_rows is the number of rows of the input record batch (Used during bound check)
///
/// `idx`: is the index of last row for which result is calculated.
/// `n_rows`: is the number of rows of the input record batch (Used during bound check)
fn get_range(&self, _idx: usize, _n_rows: usize) -> Result<Range<usize>> {
Err(DataFusionError::NotImplemented(
"get_range is not implemented for this window function".to_string(),
Expand All @@ -83,7 +131,9 @@ pub trait PartitionEvaluator: Debug + Send {
))
}

/// evaluate the partition evaluator against the partition but with rank
/// Evaluate the partition evaluator against the partition but with rank
///
/// See [`Self::include_rank`] for more details
fn evaluate_with_rank(
&self,
_num_rows: usize,
Expand Down
19 changes: 9 additions & 10 deletions datafusion/physical-expr/src/window/sliding_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@ use crate::{
expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr,
};

/// A window expr that takes the form of an aggregate function
/// Aggregate Window Expressions that have the form
/// `OVER({ROWS | RANGE| GROUPS} BETWEEN UNBOUNDED PRECEDING AND ...)`
/// e.g cumulative window frames uses `PlainAggregateWindowExpr`. Where as Aggregate Window Expressions
/// that have the form `OVER({ROWS | RANGE| GROUPS} BETWEEN M {PRECEDING| FOLLOWING} AND ...)`
/// e.g sliding window frames uses `SlidingAggregateWindowExpr`.
/// A window expr that takes the form of an aggregate function that
/// can be incrementally computed over sliding windows.
///
/// See comments on [`WindowExpr`] for more details.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consolidated this description into WindowExpr

#[derive(Debug)]
pub struct SlidingAggregateWindowExpr {
aggregate: Arc<dyn AggregateExpr>,
Expand Down Expand Up @@ -72,10 +70,11 @@ impl SlidingAggregateWindowExpr {
}
}

/// peer based evaluation based on the fact that batch is pre-sorted given the sort columns
/// and then per partition point we'll evaluate the peer group (e.g. SUM or MAX gives the same
/// results for peers) and concatenate the results.

/// Incrementally update window function using the fact that batch is
/// pre-sorted given the sort columns and then per partition point.
///
/// Evaluates the peer group (e.g. `SUM` or `MAX` gives the same results
/// for peers) and concatenate the results.
impl WindowExpr for SlidingAggregateWindowExpr {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
Expand Down
26 changes: 23 additions & 3 deletions datafusion/physical-expr/src/window/window_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,28 @@ use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;

/// A window expression that:
/// * knows its resulting field
/// Common trait for [window function] implementations,
///
/// Aggregate Window Expressions that have the form
///
/// ```text
/// OVER({ROWS | RANGE| GROUPS} BETWEEN UNBOUNDED PRECEDING AND ...)
/// ```
///
/// e.g cumulative window frames uses `PlainAggregateWindowExpr`.
///
/// Aggregate Window Expressions that have the form
///
/// ```text
/// OVER({ROWS | RANGE| GROUPS} BETWEEN M {PRECEDING| FOLLOWING} AND ...)
/// ```
///
/// e.g sliding window frames use [`SlidingAggregateWindowExpr`].
///
///
/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
/// [`PlainAggregateWindowExpr`]: crate::window::PlainAggregateWindowExpr
/// [`SlidingAggregateWindowExpr`]: crate::window::SlidingAggregateWindowExpr
pub trait WindowExpr: Send + Sync + Debug {
/// Returns the window expression as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
Expand Down Expand Up @@ -123,7 +143,7 @@ pub trait WindowExpr: Send + Sync + Debug {
fn get_reverse_expr(&self) -> Option<Arc<dyn WindowExpr>>;
}

/// Trait for different `AggregateWindowExpr`s (`PlainAggregateWindowExpr`, `SlidingAggregateWindowExpr`)
/// Extension trait that adds common functionality to [`AggregateWindowExpr`]s
pub trait AggregateWindowExpr: WindowExpr {
/// Get the accumulator for the window expression. Note that distinct
/// window expressions may return distinct accumulators; e.g. sliding
Expand Down