Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
eejbyfeldt committed Jun 22, 2024
1 parent 8ec6100 commit 47ab253
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 75 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn csv_query_array_agg_distinct() -> Result<()> {
Schema::new(vec![Field::new_list(
"ARRAY_AGG(DISTINCT aggregate_test_100.c2)",
Field::new("item", DataType::UInt32, true),
true
false
),])
);

Expand Down
1 change: 0 additions & 1 deletion datafusion/functions-aggregate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ path = "src/lib.rs"
[dependencies]
ahash = { workspace = true }
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
Expand Down
75 changes: 4 additions & 71 deletions datafusion/functions-aggregate/src/array_agg.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Licensed to the Apache Software Foundation (ASF) under on
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
Expand All @@ -17,9 +17,8 @@

//! Defines physical expressions that can evaluated at runtime during query execution

use arrow::array::ArrayRef;
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::DataType;
use arrow_array::Array;
use arrow_schema::Field;

use datafusion_common::cast::as_list_array;
Expand All @@ -40,7 +39,7 @@ make_udaf_expr_and_func!(
ArrayAgg,
array_agg,
expression,
"Computes the nth value",
"input values, including nulls, concatenated into an array",
array_agg_udaf
);

Expand Down Expand Up @@ -92,7 +91,7 @@ impl AggregateUDFImpl for ArrayAgg {
Ok(vec![Field::new_list(
format_state_name(args.name, "array_agg"),
Field::new("item", args.input_type.clone(), true),
true,
args.input_nullable,
)])
}

Expand Down Expand Up @@ -203,69 +202,3 @@ impl Accumulator for ArrayAggAccumulator {
- std::mem::size_of_val(&self.datatype)
}
}

#[cfg(test)]
mod tests {
use super::*;

use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::Result;
use datafusion_physical_expr_common::aggregate::create_aggregate_expr;
use datafusion_physical_expr_common::expressions::column::Column;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;

#[test]
fn test_array_agg_expr() -> Result<()> {
let data_types = vec![
DataType::UInt32,
DataType::Int32,
DataType::Float32,
DataType::Float64,
DataType::Decimal128(10, 2),
DataType::Utf8,
];
for data_type in &data_types {
let input_schema =
Schema::new(vec![Field::new("c1", data_type.clone(), true)]);
let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(
Column::new_with_schema("c1", &input_schema).unwrap(),
)];
let result_agg_phy_exprs = create_aggregate_expr(
&array_agg_udaf(),
&input_phy_exprs[0..1],
&[],
&[],
&[],
&input_schema,
"c1",
false,
false,
)?;
assert_eq!("c1", result_agg_phy_exprs.name());
assert_eq!(
Field::new_list("c1", Field::new("item", data_type.clone(), true), true,),
result_agg_phy_exprs.field().unwrap()
);

let result_distinct = create_aggregate_expr(
&array_agg_udaf(),
&input_phy_exprs[0..1],
&[],
&[],
&[],
&input_schema,
"c1",
false,
true,
)?;
assert_eq!("c1", result_distinct.name());
assert_eq!(
Field::new_list("c1", Field::new("item", data_type.clone(), true), true,),
result_agg_phy_exprs.field().unwrap()
);
}
Ok(())
}
}
4 changes: 2 additions & 2 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use datafusion_expr::{
WindowFunctionDefinition, WindowUDF, WindowUDFImpl,
};
use datafusion_functions_aggregate::expr_fn::{
array_agg, bit_and, bit_or, bit_xor, bool_and, bool_or
array_agg, bit_and, bit_or, bit_xor, bool_and, bool_or,
};
use datafusion_functions_aggregate::string_agg::string_agg;
use datafusion_proto::bytes::{
Expand Down Expand Up @@ -675,7 +675,7 @@ async fn roundtrip_expr_api() -> Result<()> {
string_agg(col("a").cast_to(&DataType::Utf8, &schema)?, lit("|")),
bool_and(lit(true)),
bool_or(lit(true)),
array_agg(lit(1))
array_agg(lit(1)),
];

// ensure expressions created with the expr api can be round tripped
Expand Down

0 comments on commit 47ab253

Please sign in to comment.