Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::fmt::Debug;
use std::hash::Hash;
use std::mem::size_of_val;
use std::sync::Arc;
Expand Down Expand Up @@ -111,20 +111,12 @@ An alternative syntax is also supported:
description = "Number of centroids to use in the t-digest algorithm. _Default is 100_. A higher number results in more accurate approximation but requires more memory."
)
)]
#[derive(PartialEq, Eq, Hash)]
#[derive(PartialEq, Eq, Hash, Debug)]
pub struct ApproxPercentileContWithWeight {
signature: Signature,
approx_percentile_cont: ApproxPercentileCont,
}

impl Debug for ApproxPercentileContWithWeight {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only advantage I see to these manual impls is sometimes they add the name field; I personally don't see those as useful enough to need a separate impl so opting for derive to remove code where possible.

fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ApproxPercentileContWithWeight")
.field("signature", &self.signature)
.finish()
}
}

impl Default for ApproxPercentileContWithWeight {
fn default() -> Self {
Self::new()
Expand Down
12 changes: 2 additions & 10 deletions datafusion/functions-aggregate/src/bool_and_or.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,7 @@ pub struct BoolAnd {
impl BoolAnd {
fn new() -> Self {
Self {
signature: Signature::uniform(
1,
vec![DataType::Boolean],
Volatility::Immutable,
),
signature: Signature::exact(vec![DataType::Boolean], Volatility::Immutable),
}
}
}
Expand Down Expand Up @@ -251,11 +247,7 @@ pub struct BoolOr {
impl BoolOr {
fn new() -> Self {
Self {
signature: Signature::uniform(
1,
vec![DataType::Boolean],
Volatility::Immutable,
),
signature: Signature::exact(vec![DataType::Boolean], Volatility::Immutable),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ fn accumulate_correlation_states(
/// where:
/// n = number of observations
/// sum_x = sum of x values
/// sum_y = sum of y values
/// sum_y = sum of y values
/// sum_xy = sum of (x * y)
/// sum_xx = sum of x^2 values
/// sum_yy = sum of y^2 values
Expand Down
11 changes: 1 addition & 10 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,11 @@ pub fn count_all_window() -> Expr {
```"#,
standard_argument(name = "expression",)
)]
#[derive(PartialEq, Eq, Hash)]
#[derive(PartialEq, Eq, Hash, Debug)]
pub struct Count {
signature: Signature,
}

impl Debug for Count {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("Count")
.field("name", &self.name())
.field("signature", &self.signature)
.finish()
}
}

impl Default for Count {
fn default() -> Self {
Self::new()
Expand Down
122 changes: 33 additions & 89 deletions datafusion/functions-aggregate/src/covariance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,13 @@

//! [`CovarianceSample`]: covariance sample aggregations.

use arrow::datatypes::FieldRef;
use arrow::{
array::{ArrayRef, Float64Array, UInt64Array},
compute::kernels::cast,
datatypes::{DataType, Field},
};
use datafusion_common::{
Result, ScalarValue, downcast_value, plan_err, unwrap_or_internal_err,
};
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::cast::{as_float64_array, as_uint64_array};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{
Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility,
function::{AccumulatorArgs, StateFieldsArgs},
type_coercion::aggregates::NUMERICS,
utils::format_state_name,
};
use datafusion_functions_aggregate_common::stats::StatsType;
Expand Down Expand Up @@ -69,21 +63,12 @@ make_udaf_expr_and_func!(
standard_argument(name = "expression1", prefix = "First"),
standard_argument(name = "expression2", prefix = "Second")
)]
#[derive(PartialEq, Eq, Hash)]
#[derive(PartialEq, Eq, Hash, Debug)]
pub struct CovarianceSample {
signature: Signature,
aliases: Vec<String>,
}

impl Debug for CovarianceSample {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("CovarianceSample")
.field("name", &self.name())
.field("signature", &self.signature)
.finish()
}
}

impl Default for CovarianceSample {
fn default() -> Self {
Self::new()
Expand All @@ -94,7 +79,10 @@ impl CovarianceSample {
pub fn new() -> Self {
Self {
aliases: vec![String::from("covar")],
signature: Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable),
signature: Signature::exact(
vec![DataType::Float64, DataType::Float64],
Volatility::Immutable,
),
Comment on lines +82 to +85
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way type coercion handles casting for us; we can now remove the code that does casting internally in the accumulators for us

}
}
}
Expand All @@ -112,11 +100,7 @@ impl AggregateUDFImpl for CovarianceSample {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
if !arg_types[0].is_numeric() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be more confident in our signature code to guard this for us, to promote consistency across our UDFs

return plan_err!("Covariance requires numeric input types");
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Float64)
}

Expand Down Expand Up @@ -165,20 +149,11 @@ impl AggregateUDFImpl for CovarianceSample {
standard_argument(name = "expression1", prefix = "First"),
standard_argument(name = "expression2", prefix = "Second")
)]
#[derive(PartialEq, Eq, Hash)]
#[derive(PartialEq, Eq, Hash, Debug)]
pub struct CovariancePopulation {
signature: Signature,
}

impl Debug for CovariancePopulation {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("CovariancePopulation")
.field("name", &self.name())
.field("signature", &self.signature)
.finish()
}
}

impl Default for CovariancePopulation {
fn default() -> Self {
Self::new()
Expand All @@ -188,7 +163,10 @@ impl Default for CovariancePopulation {
impl CovariancePopulation {
pub fn new() -> Self {
Self {
signature: Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable),
signature: Signature::exact(
vec![DataType::Float64, DataType::Float64],
Volatility::Immutable,
),
}
}
}
Expand All @@ -206,11 +184,7 @@ impl AggregateUDFImpl for CovariancePopulation {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
if !arg_types[0].is_numeric() {
return plan_err!("Covariance requires numeric input types");
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Float64)
}

Expand Down Expand Up @@ -304,30 +278,15 @@ impl Accumulator for CovarianceAccumulator {
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values1 = &cast(&values[0], &DataType::Float64)?;
let values2 = &cast(&values[1], &DataType::Float64)?;

let mut arr1 = downcast_value!(values1, Float64Array).iter().flatten();
let mut arr2 = downcast_value!(values2, Float64Array).iter().flatten();
let values1 = as_float64_array(&values[0])?;
let values2 = as_float64_array(&values[1])?;

for i in 0..values1.len() {
let value1 = if values1.is_valid(i) {
arr1.next()
} else {
None
};
let value2 = if values2.is_valid(i) {
arr2.next()
} else {
None
for (value1, value2) in values1.iter().zip(values2) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Little driveby refactor to make iteration cleaner

let (value1, value2) = match (value1, value2) {
(Some(a), Some(b)) => (a, b),
_ => continue,
};

if value1.is_none() || value2.is_none() {
continue;
}

let value1 = unwrap_or_internal_err!(value1);
let value2 = unwrap_or_internal_err!(value2);
let new_count = self.count + 1;
let delta1 = value1 - self.mean1;
let new_mean1 = delta1 / new_count as f64 + self.mean1;
Expand All @@ -345,29 +304,14 @@ impl Accumulator for CovarianceAccumulator {
}

fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values1 = &cast(&values[0], &DataType::Float64)?;
let values2 = &cast(&values[1], &DataType::Float64)?;
let mut arr1 = downcast_value!(values1, Float64Array).iter().flatten();
let mut arr2 = downcast_value!(values2, Float64Array).iter().flatten();

for i in 0..values1.len() {
let value1 = if values1.is_valid(i) {
arr1.next()
} else {
None
};
let value2 = if values2.is_valid(i) {
arr2.next()
} else {
None
};

if value1.is_none() || value2.is_none() {
continue;
}
let values1 = as_float64_array(&values[0])?;
let values2 = as_float64_array(&values[1])?;

let value1 = unwrap_or_internal_err!(value1);
let value2 = unwrap_or_internal_err!(value2);
for (value1, value2) in values1.iter().zip(values2) {
let (value1, value2) = match (value1, value2) {
(Some(a), Some(b)) => (a, b),
_ => continue,
};

let new_count = self.count - 1;
let delta1 = self.mean1 - value1;
Expand All @@ -386,10 +330,10 @@ impl Accumulator for CovarianceAccumulator {
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
let counts = downcast_value!(states[0], UInt64Array);
let means1 = downcast_value!(states[1], Float64Array);
let means2 = downcast_value!(states[2], Float64Array);
let cs = downcast_value!(states[3], Float64Array);
let counts = as_uint64_array(&states[0])?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started using as_float64_array above so decided to make the whole file consistent in which way it handles downcasting

let means1 = as_float64_array(&states[1])?;
let means2 = as_float64_array(&states[2])?;
let cs = as_float64_array(&states[3])?;

for i in 0..counts.len() {
let c = counts.value(i);
Expand Down
24 changes: 2 additions & 22 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,12 @@ pub fn last_value(expression: Expr, order_by: Vec<SortExpr>) -> Expr {
```"#,
standard_argument(name = "expression",)
)]
#[derive(PartialEq, Eq, Hash)]
#[derive(PartialEq, Eq, Hash, Debug)]
pub struct FirstValue {
signature: Signature,
is_input_pre_ordered: bool,
}

impl Debug for FirstValue {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("FirstValue")
.field("name", &self.name())
.field("signature", &self.signature)
.field("accumulator", &"<FUNC>")
.finish()
}
}

impl Default for FirstValue {
fn default() -> Self {
Self::new()
Expand Down Expand Up @@ -1040,22 +1030,12 @@ impl Accumulator for FirstValueAccumulator {
```"#,
standard_argument(name = "expression",)
)]
#[derive(PartialEq, Eq, Hash)]
#[derive(PartialEq, Eq, Hash, Debug)]
pub struct LastValue {
signature: Signature,
is_input_pre_ordered: bool,
}

impl Debug for LastValue {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("LastValue")
.field("name", &self.name())
.field("signature", &self.signature)
.field("accumulator", &"<FUNC>")
.finish()
}
}

impl Default for LastValue {
fn default() -> Self {
Self::new()
Expand Down
12 changes: 1 addition & 11 deletions datafusion/functions-aggregate/src/grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! Defines physical expressions that can evaluated at runtime during query execution

use std::any::Any;
use std::fmt;

use arrow::datatypes::Field;
use arrow::datatypes::{DataType, FieldRef};
Expand Down Expand Up @@ -60,20 +59,11 @@ make_udaf_expr_and_func!(
description = "Expression to evaluate whether data is aggregated across the specified column. Can be a constant, column, or function."
)
)]
#[derive(PartialEq, Eq, Hash)]
#[derive(PartialEq, Eq, Hash, Debug)]
pub struct Grouping {
signature: Signature,
}

impl fmt::Debug for Grouping {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Grouping")
.field("name", &self.name())
.field("signature", &self.signature)
.finish()
}
}

impl Default for Grouping {
fn default() -> Self {
Self::new()
Expand Down
11 changes: 1 addition & 10 deletions datafusion/functions-aggregate/src/median.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,11 @@ make_udaf_expr_and_func!(
/// If using the distinct variation, the memory usage will be similarly high if the
/// cardinality is high as it stores all distinct values in memory before computing the
/// result, but if cardinality is low then memory usage will also be lower.
#[derive(PartialEq, Eq, Hash)]
#[derive(PartialEq, Eq, Hash, Debug)]
pub struct Median {
signature: Signature,
}

impl Debug for Median {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
f.debug_struct("Median")
.field("name", &self.name())
.field("signature", &self.signature)
.finish()
}
}

impl Default for Median {
fn default() -> Self {
Self::new()
Expand Down
Loading