Skip to content

Commit

Permalink
Minor: Update docstrings and comments to aggregate code (#4489)
Browse files Browse the repository at this point in the history
* Minor: add some comments to aggregate code

* Fix typos

Co-authored-by: jakevin <jakevingoo@gmail.com>

* fix typo

Co-authored-by: jakevin <jakevingoo@gmail.com>

Co-authored-by: jakevin <jakevingoo@gmail.com>
  • Loading branch information
alamb and jackwener authored Dec 5, 2022
1 parent b6146b9 commit 394c5ee
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 9 deletions.
8 changes: 7 additions & 1 deletion datafusion/core/src/physical_plan/aggregates/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,11 @@ impl std::fmt::Debug for Accumulators {
}

/// Create a RecordBatch with all group keys and accumulator' states or values.
///
/// The output looks like
/// ```text
/// gby_expr1, gby_expr2, ... agg_1, agg2, ...
/// ```
fn create_batch_from_map(
mode: &AggregateMode,
accumulators: &Accumulators,
Expand All @@ -493,6 +498,7 @@ fn create_batch_from_map(
}
}

// First, output all group by exprs
let mut columns = (0..num_group_expr)
.map(|i| {
ScalarValue::iter_to_array(
Expand All @@ -504,7 +510,7 @@ fn create_batch_from_map(
})
.collect::<Result<Vec<_>>>()?;

// add state / evaluated arrays
// next, output aggregates: either intermediate state or final output
for (x, &state_len) in acc_data_types.iter().enumerate() {
for y in 0..state_len {
match mode {
Expand Down
36 changes: 28 additions & 8 deletions datafusion/expr/src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,48 @@ use std::fmt::Debug;
/// * update its state from multiple accumulators' states via `merge_batch`
/// * compute the final value from its internal state via `evaluate`
pub trait Accumulator: Send + Sync + Debug {
/// Returns the state of the accumulator at the end of the accumulation.
/// in the case of an average on which we track `sum` and `n`, this function should return a vector
/// of two values, sum and n.
/// Returns the partial intermediate state of the accumulator. This
/// partial state is serialied as `Arrays` and then combined with
/// other partial states from different instances of this
/// accumulator (that ran on different partitions, for
/// example).
///
/// The state can be a different type than the output of the
/// [`Accumulator`]
///
/// See [`merge_batch`] for more details on the merging process.
///
/// For example, in the case of an average, for which we track `sum` and `n`,
/// this function should return a vector of two values, sum and n.
fn state(&self) -> Result<Vec<AggregateState>>;

/// Updates the accumulator's state from a vector of arrays.
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;

/// Retracts an update (caused by the given inputs) to accumulator's state.
/// Inverse operation of the `update_batch` operation. This method must be
/// for accumulators that should support bounded OVER aggregates.
/// Retracts an update (caused by the given inputs) to
/// accumulator's state.
///
/// This is the inverse operation of [`update_batch`] and is used
/// to incrementally calculate window aggregates where the OVER
/// clause defines a bounded window.
fn retract_batch(&mut self, _values: &[ArrayRef]) -> Result<()> {
// TODO add retract for all accumulators
Err(DataFusionError::Internal(
"Retract should be implemented for aggregate functions when used with custom window frame queries".to_string()
))
}

/// updates the accumulator's state from a vector of states.
/// Updates the accumulator's state from an `Array` containing one
/// or more intermediate values.
///
/// The `states` array passed was formed by concatenating the
/// results of calling `[state]` on zero or more other accumulator
/// instances.
///
/// `states` is an array of the same types as returned by [`state`]
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;

/// returns its value based on its current state.
/// Returns the final aggregate value based on its current state.
fn evaluate(&self) -> Result<ScalarValue>;

/// Allocated size required for this accumulator, in bytes, including `Self`.
Expand Down

0 comments on commit 394c5ee

Please sign in to comment.