Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect results with multiple COUNT(DISTINCT..) aggregates on dictionaries #9679

Merged
merged 4 commits into from
Mar 19, 2024

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Mar 18, 2024

Which issue does this PR close?

Closes #9586

Rationale for this change

Fix a bug that was introduced in #9234

What changes are included in this PR?

  1. Fix bug (3 lines)
  2. Add test coverage
  3. Update comments

Are these changes tested?

Yes they are covered but I think we could do better with the coverage. I will see if I can get a fuzz test here too

Are there any user-facing changes?

Bug fix

@github-actions github-actions bot added physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt) labels Mar 18, 2024
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
}
assert_eq!(states.len(), 1, "array_agg states must be singleton!");
let array = &states[0];
let list_array = array.as_list::<i32>();
let inner_array = list_array.value(0);
self.update_batch(&[inner_array])
for inner_array in list_array.iter() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual bug fix -- to use all rows not just the first. The rest of this PR is tests / comment improvements

FROM m3
GROUP BY column3;
----
1 2
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this query returns 1 1 without the code change in this PR

@alamb alamb marked this pull request as ready for review March 18, 2024 18:51
@Dandandan
Copy link
Contributor

Nice!

@alamb
Copy link
Contributor Author

alamb commented Mar 18, 2024

Thank you for the review @Dandandan

@alamb
Copy link
Contributor Author

alamb commented Mar 18, 2024

cc @jayzhan211

@@ -280,3 +280,70 @@ ORDER BY
2023-12-20T01:20:00 1000 f2 foo
2023-12-20T01:30:00 1000 f1 32.0
2023-12-20T01:30:00 1000 f2 foo

# Cleanup
statement error DataFusion error: Execution error: Table 'm1' doesn't exist\.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it not "ok", but an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call -- this is a mistake and I will fix it

select * from (values('foo', 'baz', 1));

######
# Now, create a table with the same data, but column2 has type `Dictionary(Int32)` to trigger the fallback code
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does the cast to the dictionary trigger the fallback code? Does it refer to merge_batch?

Copy link
Contributor

@jayzhan211 jayzhan211 Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

specifically, why the key of dict is the sub group index after casting? 🤔

group 1: "a", "b",
group 2: "c"

we get
(0, a), (1, "b"), and (0, "c")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does the cast to the dictionary trigger the fallback code?

The reason the dictionary triggers merge is that when grouping on strings or primitive values, the DistinctCountAccumulator code path is not used. Instead one of the specialized implementations (like BytesDistinctCountAccumulator) is used instead, which use the GroupsAccumulator interface.

Dictionary encoded columns run this path DistinctCountAccumulator
https://github.com/apache/arrow-datafusion/blob/b0b329ba39403b9e87156d6f9b8c5464dc6d2480/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs#L160-L163

specifically, why the key of dict is the sub group index after casting? 🤔

What is happening is that we are doing a two phase groupby (illustated here)

https://github.com/apache/arrow-datafusion/blob/b0b329ba39403b9e87156d6f9b8c5464dc6d2480/datafusion/expr/src/accumulator.rs#L99-L131

And so there are two different Partial group bys happening. Each PartialGroupBy produces a a set of distinct values. Using your example, I think it would be more like the following (where we have the same group in multiple partial results):

group 1 (partial): "a", "b",
group 1 (partial): "c"

The merge is called to combine the results together with a two element array

("a, "b")
("c")

But I may be misunderstanding your question

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also filed #9695 to add some more coverage of array operations

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about how and where the DictionarayArray has been built. It is quite hard to trace the previous caller of GroupedHashAggregateStream::poll_next with RUST_BACKTRACE.

https://github.com/apache/arrow-datafusion/blob/b0b329ba39403b9e87156d6f9b8c5464dc6d2480/datafusion/physical-plan/src/aggregates/row_hash.rs#L434

batch: RecordBatch { schema: Schema { fields: [Field { name: "column3", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "COUNT(DISTINCT m3.column1)[count distinct]", data_type: List(Field { name: "item", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "COUNT(DISTINCT m3.column2)[count distinct]", data_type: List(Field { name: "item", data_type: Dictionary(Int32, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<Int64>
[
  1,
  1,
], ListArray
[
  StringArray
[
  "foo",
],
  StringArray
[
  "foo",
],
], ListArray
[
  DictionaryArray {keys: PrimitiveArray<Int32>
[
  0,
] values: StringArray
[
  "bar",
  "baz",
]}
,
  DictionaryArray {keys: PrimitiveArray<Int32>
[
  1,
] values: StringArray
[
  "bar",
  "baz",
]}
,
]], row_count: 2 }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about how and where the DictionarayArray has been built.

I think it comes from emitting ScalarValue::Dictionary that are combined into an array here

https://github.com/apache/arrow-datafusion/blob/b87dd6143c2dc089b07f74780bd525c4369e68a3/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs#L304-L309

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems converted at this point already

@alamb alamb merged commit 3c3b228 into apache:main Mar 19, 2024
23 checks passed
@alamb
Copy link
Contributor Author

alamb commented Mar 19, 2024

Thank you for the comments/reviews @jayzhan211 and @Dandandan

self.update_batch(&[inner_array])
for inner_array in list_array.iter() {
let inner_array = inner_array
.expect("counts are always non null, so are intermediate results");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this when updating my local repo .... is expect something that should be used here ... my understanding that it panics on None. Given the method returns Result I would expect err to be returned instead - am I missing something in my understanding of Rust here?

Copy link
Contributor

@jayzhan211 jayzhan211 Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It panics, but it is fine if it is ensured to be non-null. I am looking into how the array was built in the above comment but failed. 😢

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point and I think it would be a better UX to avoid panic'ing even if something "impossible" happens. I made the change in #9712

@alamb alamb deleted the alamb/multi-distinct branch March 20, 2024 17:26
wiedld pushed a commit to wiedld/arrow-datafusion that referenced this pull request Mar 21, 2024
… dictionaries (apache#9679)

* Add test for multiple count distincts on a dictionary

* Fix accumulator merge bug

* Fix cleanup code
alamb added a commit to alamb/datafusion that referenced this pull request Mar 22, 2024
… dictionaries (apache#9679)

* Add test for multiple count distincts on a dictionary

* Fix accumulator merge bug

* Fix cleanup code
wiedld pushed a commit to wiedld/arrow-datafusion that referenced this pull request Mar 28, 2024
… dictionaries (apache#9679)

* Add test for multiple count distincts on a dictionary

* Fix accumulator merge bug

* Fix cleanup code
wiedld pushed a commit to influxdata/arrow-datafusion that referenced this pull request Apr 1, 2024
… dictionaries (apache#9679)

* Add test for multiple count distincts on a dictionary

* Fix accumulator merge bug

* Fix cleanup code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Regression: Wrong result when there are 2 count(distinct)
4 participants