Skip to content

Commit

Permalink
add aggregate
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
  • Loading branch information
jayzhan211 committed Aug 10, 2023
1 parent 681da6d commit 97d9b69
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 6 deletions.
10 changes: 10 additions & 0 deletions datafusion/core/tests/sqllogictests/test_files/array.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2362,6 +2362,12 @@ from flatten_table;
[1, 2, 3, 4, 5, 6] [8] [1, 2, 3] [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
# array aggregate function

## array aggregate
query I
select array_aggregate([1, 3, 5, 7], 'sum');
----
16

## array sum
query IRI
select array_sum([1, 3, 5, 7]),
Expand All @@ -2370,6 +2376,10 @@ select array_sum([1, 3, 5, 7]),
----
16 6.6 23

# TODO: Support nulls in array.
# query error DataFusion error: This feature is not implemented: Arrays with different types are not supported: \{Int64, Null\}
# select array_sum([1, null, 3, null]);

query ??
select column1, column6 from arrays_values_without_nulls;
----
Expand Down
26 changes: 21 additions & 5 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ pub enum BuiltinScalarFunction {
Cot,

// array functions
/// array_aggregate
ArrayAggregate,
/// array_append
ArrayAppend,
/// array_concat
Expand Down Expand Up @@ -354,6 +356,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Tanh => Volatility::Immutable,
BuiltinScalarFunction::Trunc => Volatility::Immutable,
BuiltinScalarFunction::ArrayAppend => Volatility::Immutable,
BuiltinScalarFunction::ArrayAggregate => Volatility::Immutable,
BuiltinScalarFunction::ArrayConcat => Volatility::Immutable,
BuiltinScalarFunction::ArrayHasAll => Volatility::Immutable,
BuiltinScalarFunction::ArrayHasAny => Volatility::Immutable,
Expand Down Expand Up @@ -524,6 +527,10 @@ impl BuiltinScalarFunction {
Ok(data_type)
}
BuiltinScalarFunction::ArrayAppend => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayAggregate => {
// TODO: Fix this
Ok(Int64)
}
BuiltinScalarFunction::ArrayConcat => {
let mut expr_type = Null;
let mut max_dims = 0;
Expand Down Expand Up @@ -860,16 +867,19 @@ impl BuiltinScalarFunction {

// for now, the list is small, as we do not have many built-in functions.
match self {
BuiltinScalarFunction::ArrayAppend => Signature::any(2, self.volatility()),
BuiltinScalarFunction::ArrayConcat => {
Signature::variadic_any(self.volatility())
}
BuiltinScalarFunction::ArrayDims => Signature::any(1, self.volatility()),
BuiltinScalarFunction::ArrayElement => Signature::any(2, self.volatility()),
BuiltinScalarFunction::Flatten => Signature::any(1, self.volatility()),
BuiltinScalarFunction::ArrayHasAll

BuiltinScalarFunction::ArrayAggregate
| BuiltinScalarFunction::ArrayAppend
| BuiltinScalarFunction::ArrayElement
| BuiltinScalarFunction::ArrayHasAll
| BuiltinScalarFunction::ArrayHasAny
| BuiltinScalarFunction::ArrayHas => Signature::any(2, self.volatility()),
BuiltinScalarFunction::ArrayDims | BuiltinScalarFunction::Flatten => {
Signature::any(1, self.volatility())
}
BuiltinScalarFunction::ArrayLength => {
Signature::variadic_any(self.volatility())
}
Expand Down Expand Up @@ -1338,6 +1348,12 @@ fn aliases(func: &BuiltinScalarFunction) -> &'static [&'static str] {
BuiltinScalarFunction::ArrowTypeof => &["arrow_typeof"],

// array functions
BuiltinScalarFunction::ArrayAggregate => &[
"array_aggregate",
"list_aggregate",
"array_aggr",
"list_aggr",
],
BuiltinScalarFunction::ArrayAppend => &[
"array_append",
"list_append",
Expand Down
6 changes: 6 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,12 @@ scalar_expr!(
array element,
"appends an element to the end of an array."
);
scalar_expr!(
ArrayAggregate,
array_aggregate,
array name,
"allows the execution of arbitrary existing aggregate functions on the elements of a list"
);
nary_scalar_expr!(ArrayConcat, array_concat, "concatenates arrays.");
scalar_expr!(
ArrayHas,
Expand Down
15 changes: 14 additions & 1 deletion datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2164,7 +2164,20 @@ macro_rules! array_sum_internal(
}}
);

/// Array_sum SQL function
/// array_aggregate SQL function
pub fn array_aggregate(args: &[ArrayRef]) -> Result<ArrayRef> {
assert_eq!(args.len(), 2);
let func_name = args[1].as_string::<i32>().value(0);
let args = &args[0..1];
match func_name {
"sum" => array_sum(args),
_ => Err(DataFusionError::NotImplemented(format!(
"array_aggregate does not support function '{func_name}'"
))),
}
}

/// array_sum SQL function
pub fn array_sum(args: &[ArrayRef]) -> Result<ArrayRef> {
assert_eq!(args.len(), 1);

Expand Down
3 changes: 3 additions & 0 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,9 @@ pub fn create_physical_fun(
BuiltinScalarFunction::ArrayAppend => {
Arc::new(|args| make_scalar_function(array_expressions::array_append)(args))
}
BuiltinScalarFunction::ArrayAggregate => Arc::new(|args| {
make_scalar_function(array_expressions::array_aggregate)(args)
}),
BuiltinScalarFunction::ArrayConcat => {
Arc::new(|args| make_scalar_function(array_expressions::array_concat)(args))
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ enum ScalarFunction {
Nanvl = 111;
Flatten = 112;
ArraySum = 113;
ArrayAggregate = 114;
}

message ScalarFunctionNode {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::Ltrim => Self::Ltrim,
ScalarFunction::Rtrim => Self::Rtrim,
ScalarFunction::ToTimestamp => Self::ToTimestamp,
ScalarFunction::ArrayAggregate => Self::ArrayAggregate,
ScalarFunction::ArrayAppend => Self::ArrayAppend,
ScalarFunction::ArrayConcat => Self::ArrayConcat,
ScalarFunction::ArrayHasAll => Self::ArrayHasAll,
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
BuiltinScalarFunction::Ltrim => Self::Ltrim,
BuiltinScalarFunction::Rtrim => Self::Rtrim,
BuiltinScalarFunction::ToTimestamp => Self::ToTimestamp,
BuiltinScalarFunction::ArrayAggregate => Self::ArrayAggregate,
BuiltinScalarFunction::ArrayAppend => Self::ArrayAppend,
BuiltinScalarFunction::ArrayConcat => Self::ArrayConcat,
BuiltinScalarFunction::ArrayHasAll => Self::ArrayHasAll,
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ Unlike to some databases the math functions in Datafusion works the same way as

| Function | Notes |
| ------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| array_aggregate(array, name) | Allows the execution of arbitrary existing aggregate functions on the elements of a list. `array_aggregate([1, 2, 3], 'sum') -> 6` |
| array_append(array, element) | Appends an element to the end of an array. `array_append([1, 2, 3], 4) -> [1, 2, 3, 4]` |
| array_concat(array[, ..., array_n]) | Concatenates arrays. `array_concat([1, 2, 3], [4, 5, 6]) -> [1, 2, 3, 4, 5, 6]` |
| array_has(array, element) | Returns true if the array contains the element `array_has([1,2,3], 1) -> true` |
Expand Down
21 changes: 21 additions & 0 deletions docs/source/user-guide/sql/scalar_functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,7 @@ from_unixtime(expression)

## Array Functions

- [array_aggregate](#array_aggregate)
- [array_append](#array_append)
- [array_cat](#array_cat)
- [array_concat](#array_concat)
Expand Down Expand Up @@ -1495,6 +1496,26 @@ from_unixtime(expression)
- [make_list](#make_list)
- [trim_array](#trim_array)

### `array_aggregate`

Allows the execution of arbitrary existing aggregate functions on the elements of a list.

```
array_aggregate(array, name)
```

#### Arguments

- **array**: Array expression.
Can be a constant, column, or function, and any combination of array operators.
- **name**: Aggregate function name.

#### Aliases

- list_aggregate
- array_aggr
- list_aggr

### `array_append`

Appends an element to the end of an array.
Expand Down

0 comments on commit 97d9b69

Please sign in to comment.