Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify and encapsulate window function state management #6621

Merged
merged 5 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 3 additions & 43 deletions datafusion/physical-expr/src/window/built_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ use std::sync::Arc;
use super::window_frame_state::WindowFrameContext;
use super::BuiltInWindowFunctionExpr;
use super::WindowExpr;
use crate::window::window_expr::{
BuiltinWindowState, NthValueKind, NthValueState, WindowFn,
};
use crate::window::window_expr::WindowFn;
use crate::window::{
PartitionBatches, PartitionWindowAggStates, WindowAggState, WindowState,
};
use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr};
use arrow::array::{new_empty_array, Array, ArrayRef};
use arrow::array::{new_empty_array, ArrayRef};
use arrow::compute::SortOptions;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -211,13 +209,7 @@ impl WindowExpr for BuiltInWindowExpr {

state.update(&out_col, partition_batch_state)?;
if self.window_frame.start_bound.is_unbounded() {
let mut evaluator_state = evaluator.state()?;
if let BuiltinWindowState::NthValue(nth_value_state) =
&mut evaluator_state
{
memoize_nth_value(state, nth_value_state)?;
evaluator.set_state(&evaluator_state)?;
}
evaluator.memoize(state)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is actually more readable as well as cleaning up the trait

}
}
Ok(())
Expand All @@ -244,35 +236,3 @@ impl WindowExpr for BuiltInWindowExpr {
|| !self.window_frame.end_bound.is_unbounded())
}
}

// When the window frame has a fixed beginning (e.g UNBOUNDED PRECEDING), for
// FIRST_VALUE, LAST_VALUE and NTH_VALUE functions: we can memoize result.
// Once result is calculated it will always stay same. Hence, we do not
// need to keep past data as we process the entire dataset. This feature
// enables us to prune rows from table.
fn memoize_nth_value(
state: &mut WindowAggState,
nth_value_state: &mut NthValueState,
) -> Result<()> {
let out = &state.out_col;
let size = out.len();
let (is_prunable, new_prunable) = match nth_value_state.kind {
NthValueKind::First => {
let n_range = state.window_frame_range.end - state.window_frame_range.start;
(n_range > 0 && size > 0, true)
}
NthValueKind::Last => (true, false),
NthValueKind::Nth(n) => {
let n_range = state.window_frame_range.end - state.window_frame_range.start;
(n_range >= (n as usize) && size >= (n as usize), true)
}
};
if is_prunable {
if nth_value_state.finalized_result.is_none() && new_prunable {
let result = ScalarValue::try_from_array(out, size - 1)?;
nth_value_state.finalized_result = Some(result);
}
state.window_frame_range.start = state.window_frame_range.end.saturating_sub(1);
}
Ok(())
}
7 changes: 1 addition & 6 deletions datafusion/physical-expr/src/window/lead_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! at runtime during query execution

use crate::window::partition_evaluator::PartitionEvaluator;
use crate::window::window_expr::{BuiltinWindowState, LeadLagState};
use crate::window::window_expr::LeadLagState;
use crate::window::{BuiltInWindowFunctionExpr, WindowAggState};
use crate::PhysicalExpr;
use arrow::array::ArrayRef;
Expand Down Expand Up @@ -182,11 +182,6 @@ fn shift_with_default_value(
}

impl PartitionEvaluator for WindowShiftEvaluator {
fn state(&self) -> Result<BuiltinWindowState> {
// If we do not use state we just return Default
Ok(BuiltinWindowState::LeadLag(self.state.clone()))
}

fn update_state(
&mut self,
_state: &WindowAggState,
Expand Down
33 changes: 24 additions & 9 deletions datafusion/physical-expr/src/window/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! that can evaluated at runtime during query execution

use crate::window::partition_evaluator::PartitionEvaluator;
use crate::window::window_expr::{BuiltinWindowState, NthValueKind, NthValueState};
use crate::window::window_expr::{NthValueKind, NthValueState};
use crate::window::{BuiltInWindowFunctionExpr, WindowAggState};
use crate::PhysicalExpr;
use arrow::array::{Array, ArrayRef};
Expand Down Expand Up @@ -152,11 +152,6 @@ pub(crate) struct NthValueEvaluator {
}

impl PartitionEvaluator for NthValueEvaluator {
fn state(&self) -> Result<BuiltinWindowState> {
// If we do not use state we just return Default
Ok(BuiltinWindowState::NthValue(self.state.clone()))
}

fn update_state(
&mut self,
state: &WindowAggState,
Expand All @@ -169,9 +164,29 @@ impl PartitionEvaluator for NthValueEvaluator {
Ok(())
}

fn set_state(&mut self, state: &BuiltinWindowState) -> Result<()> {
if let BuiltinWindowState::NthValue(nth_value_state) = state {
self.state = nth_value_state.clone()
fn memoize(&mut self, state: &mut WindowAggState) -> Result<()> {
let out = &state.out_col;
let size = out.len();
let (is_prunable, new_prunable) = match self.state.kind {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that is is not introduced here (and introduced by me) but while reading the code maybe instead of new_prunable we can use is_last flag. I think, that way intent will be more clear.

NthValueKind::First => {
let n_range =
state.window_frame_range.end - state.window_frame_range.start;
(n_range > 0 && size > 0, true)
Copy link
Contributor

@mustafasrepo mustafasrepo Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case we use is_last flag, second flag should be false

}
NthValueKind::Last => (true, false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case we use is_last flag, second flag should be true

NthValueKind::Nth(n) => {
let n_range =
state.window_frame_range.end - state.window_frame_range.start;
(n_range >= (n as usize) && size >= (n as usize), true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case we use is_last flag, second flag should be false

}
};
if is_prunable {
if self.state.finalized_result.is_none() && new_prunable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case we use is_last flag, condition should be if self.state.finalized_result.is_none() && !is_last {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the intent is clearer using the term is_last. Thank you for that suggestion . Done in 4542799

let result = ScalarValue::try_from_array(out, size - 1)?;
self.state.finalized_result = Some(result);
}
state.window_frame_range.start =
state.window_frame_range.end.saturating_sub(1);
}
Ok(())
}
Expand Down
24 changes: 8 additions & 16 deletions datafusion/physical-expr/src/window/partition_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

//! Partition evaluation module

use crate::window::window_expr::BuiltinWindowState;
use crate::window::WindowAggState;
use arrow::array::ArrayRef;
use datafusion_common::Result;
Expand Down Expand Up @@ -100,14 +99,6 @@ pub trait PartitionEvaluator: Debug + Send {
false
}

/// Returns the internal state of the window function
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of the PR is to get of the functions on this trait that serialized / set state (which has a single use)

///
/// Only used for stateful evaluation
fn state(&self) -> Result<BuiltinWindowState> {
// If we do not use state we just return Default
Ok(BuiltinWindowState::Default)
}

/// Updates the internal state for window function
///
/// Only used for stateful evaluation
Expand All @@ -127,13 +118,14 @@ pub trait PartitionEvaluator: Debug + Send {
Ok(())
}

/// Sets the internal state for window function
///
/// Only used for stateful evaluation
fn set_state(&mut self, _state: &BuiltinWindowState) -> Result<()> {
Err(DataFusionError::NotImplemented(
"set_state is not implemented for this window function".to_string(),
))
/// When the window frame has a fixed beginning (e.g UNBOUNDED
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While writing this explanation it occurred to me that the term memoize, while technically accurate is not the entire story here. I wonder if there is a better name like adjust_window_state 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adjust_window_state feels like a more general functionality. However, this function is a special case for nth_value when window frame boundary has UNBOUNDED PRECEDING. Hence I think memoize is better for this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will leave it named memoize

/// PRECEDING), some functions such as FIRST_VALUE, LAST_VALUE and
/// NTH_VALUE we can memoize result. Once result is calculated it
/// will always stay same. Hence, we do not need to keep past data
/// as we process the entire dataset. This feature enables us to
/// prune rows from table. The default implementation does nothing
fn memoize(&mut self, _state: &mut WindowAggState) -> Result<()> {
Ok(())
}

/// Gets the range where the window function result is calculated.
Expand Down
6 changes: 1 addition & 5 deletions datafusion/physical-expr/src/window/rank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! at runtime during query execution

use crate::window::partition_evaluator::PartitionEvaluator;
use crate::window::window_expr::{BuiltinWindowState, RankState};
use crate::window::window_expr::RankState;
use crate::window::{BuiltInWindowFunctionExpr, WindowAggState};
use crate::PhysicalExpr;
use arrow::array::ArrayRef;
Expand Down Expand Up @@ -125,10 +125,6 @@ impl PartitionEvaluator for RankEvaluator {
Ok(Range { start, end })
}

fn state(&self) -> Result<BuiltinWindowState> {
Ok(BuiltinWindowState::Rank(self.state.clone()))
}

fn update_state(
&mut self,
state: &WindowAggState,
Expand Down
7 changes: 1 addition & 6 deletions datafusion/physical-expr/src/window/row_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Defines physical expression for `row_number` that can evaluated at runtime during query execution

use crate::window::partition_evaluator::PartitionEvaluator;
use crate::window::window_expr::{BuiltinWindowState, NumRowsState};
use crate::window::window_expr::NumRowsState;
use crate::window::BuiltInWindowFunctionExpr;
use crate::PhysicalExpr;
use arrow::array::{ArrayRef, UInt64Array};
Expand Down Expand Up @@ -76,11 +76,6 @@ pub(crate) struct NumRowsEvaluator {
}

impl PartitionEvaluator for NumRowsEvaluator {
fn state(&self) -> Result<BuiltinWindowState> {
// If we do not use state we just return Default
Ok(BuiltinWindowState::NumRows(self.state.clone()))
}

fn get_range(&self, idx: usize, _n_rows: usize) -> Result<Range<usize>> {
let start = idx;
let end = idx + 1;
Expand Down
11 changes: 1 addition & 10 deletions datafusion/physical-expr/src/window/window_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,16 +327,7 @@ pub struct LeadLagState {
pub idx: usize,
}

#[derive(Debug, Clone, Default)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out this structure is not used anywhere else

pub enum BuiltinWindowState {
Rank(RankState),
NumRows(NumRowsState),
NthValue(NthValueState),
LeadLag(LeadLagState),
#[default]
Default,
}

/// Holds the state of evaluating a window function
#[derive(Debug)]
pub struct WindowAggState {
/// The range that we calculate the window function
Expand Down