Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix AggregateStatistics optimization so it doesn't change output type #2674

Merged
merged 7 commits into from
Jun 2, 2022

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Jun 1, 2022

Which issue does this PR close?

Closes #2673

Rationale for this change

See #2673 -- the optimizer pass is changing input types.

What changes are included in this PR?

  1. Fix bug
  2. New Regresion test
  3. Give some constants related to COUNT(*) expansion symbolic names to improve readability

Are there any user-facing changes?

less bugs

Does this PR break compatibility with Ballista?

No

@github-actions github-actions bot added core Core DataFusion crate datafusion Changes in the datafusion crate labels Jun 1, 2022
@github-actions github-actions bot added logical-expr Logical plan and expressions sql SQL Planner labels Jun 1, 2022
@@ -37,6 +38,9 @@ use crate::error::Result;
#[derive(Default)]
pub struct AggregateStatistics {}

/// The name of the column corresponding to [`COUNT_STAR_EXPANSION`]
const COUNT_STAR_NAME: &str = "COUNT(UInt8(1))";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constant was hard coded in a few places and I think this symbolic name helps understand what it is doing

@@ -148,10 +152,10 @@ fn take_optimizable_table_count(
.as_any()
.downcast_ref::<expressions::Literal>()
{
if lit_expr.value() == &ScalarValue::UInt8(Some(1)) {
if lit_expr.value() == &COUNT_STAR_EXPANSION {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was an implicit coupling between the SQL planner and this file, which I have now made explicit with a named constant


// Validate that the optimized plan returns the exact same
// answer (both schema and data) as the original plan
let task_ctx = session_ctx.task_ctx();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test would have caught this issue when it was introduced in #2636

Ok(())
}

/// Normalize record batches for comparison:
/// 1. Sets nullable to `true`
fn normalize(batches: Vec<RecordBatch>) -> Vec<RecordBatch> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is stupid but necessary to pass the tests

};
Arc::new(Count::new(expr, "my_count_alias", DataType::UInt64))

Arc::new(Count::new(expr, name, DataType::Int64))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that the schema is checked, we can't use some arbitrary column name, we need to use the actual name the plan would

AggregateFunction::Count => function
.args
.into_iter()
.map(|a| match a {
FunctionArg::Unnamed(FunctionArgExpr::Expr(SQLExpr::Value(
Value::Number(_, _),
))) => Ok(lit(1_u8)),
FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => Ok(lit(1_u8)),
))) => Ok(Expr::Literal(COUNT_STAR_EXPANSION.clone())),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a readability improvement to name a constant to make what is happening more explicit

@alamb alamb marked this pull request as ready for review June 1, 2022 13:57
return Some((
ScalarValue::UInt64(Some(num_rows as u64)),
"COUNT(UInt8(1))",
ScalarValue::Int64(Some(num_rows as i64)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change from UInt64 to Int64 here and a few lines below is the actual bug fix / change of behavior -- the rest of this PR is testing / readability improvements

@tustvold tustvold changed the title Fix AggregateStatistics optimization so it doens't change output type Fix AggregateStatistics optimization so it doesn't change output type Jun 2, 2022
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, minor comment about test readability.

I also wonder if the fact it isn't always not nullable is actually a bug in the aggregate function, can count(..) ever return NULL? I thought it just skipped nulls?

let conf = session_ctx.copied_config();
let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?;
let plan = Arc::new(plan) as _;
let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &conf)?;

let (col, count) = match nulls {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was very confused by what this parameter controls, should it not be something like column: Option<&str> instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it is really controlling count(*) vs COUNT(col) -- I consolidated the differences in eb14658 into a TestAggregate struct and I think it is much more understandable now

// answer (both schema and data) as the original plan
let task_ctx = session_ctx.task_ctx();
let plan_result = common::collect(plan.execute(0, task_ctx)?).await?;
assert_eq!(normalize(result), normalize(plan_result));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of lines above there is

assert_eq!(result[0].schema(), Arc::new(Schema::new(vec![col])));

This would suggest to me that the result has a single column and a single field. Perhaps we could just do something like

let expected_a_schema = ..;
let expected_b_schema = ..;
for (a, b) in result.iter().zip(plan_result) {
  assert_eq!(a.column(0), b.column(0);
  assert_eq!(a.schema(), expected_a_schema);
  assert_eq!(b.schema(), expected_b_schema);
}

I think the normalization logic is a little bit hard to follow...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the normalization in 171c899 and I think it is much simpler to follow now

};
Arc::new(Count::new(expr, "my_count_alias", DataType::UInt64))
/// Describe the type of aggregate being tested
enum TestAggregate {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now parameterizes the difference between different tests into an explicit enum rather than implicit assumptions. I think it makes the tests easier to follow

@codecov-commenter
Copy link

Codecov Report

Merging #2674 (fde3cc4) into master (f547262) will increase coverage by 0.01%.
The diff coverage is 98.59%.

@@            Coverage Diff             @@
##           master    #2674      +/-   ##
==========================================
+ Coverage   84.69%   84.70%   +0.01%     
==========================================
  Files         267      267              
  Lines       47004    47036      +32     
==========================================
+ Hits        39810    39843      +33     
+ Misses       7194     7193       -1     
Impacted Files Coverage Δ
datafusion/expr/src/utils.rs 91.86% <ø> (ø)
datafusion/sql/src/planner.rs 81.56% <66.66%> (-0.04%) ⬇️
...ore/src/physical_optimizer/aggregate_statistics.rs 100.00% <100.00%> (ø)
datafusion/core/tests/custom_sources.rs 83.90% <100.00%> (ø)
datafusion/common/src/scalar.rs 74.94% <0.00%> (+0.11%) ⬆️
datafusion/core/src/physical_plan/metrics/value.rs 87.43% <0.00%> (+0.50%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f547262...fde3cc4. Read the comment docs.

@alamb alamb merged commit 8ddd99c into apache:master Jun 2, 2022
@alamb alamb deleted the alamb/fix_optimization_type branch June 2, 2022 21:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate datafusion Changes in the datafusion crate logical-expr Logical plan and expressions sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Optimization pass AggregateStatistics changes type of output from Int64 to UInt64
3 participants