Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of COUNT (distinct x) for dictionary columns #258 #5554

Closed
wants to merge 7 commits into from

Conversation

jaylmiller
Copy link
Contributor

@jaylmiller jaylmiller commented Mar 11, 2023

Which issue does this PR close?

Closes #258.

Rationale for this change

The count distinct physical expr was doing alot of unnecessary hashing when it is ran on dictionary types. Previously, every cell in the dictionary array was being added to the distinct values hashset, with this change, we only need to add each value type at most once (never if it is not present) to the array.

What changes are included in this PR?

A new accumulator (CountDistinctDictAccumulator) that is returned by DistinctCount in the case that a dictionary array is being counted. There is a fair amount of shared logic between the accumulators so that was also pulled out into helper funcs.

Are these changes tested?

Added some new unit tests.

Are there any user-facing changes?

@github-actions github-actions bot added the physical-expr Physical Expressions label Mar 11, 2023
@jaylmiller jaylmiller marked this pull request as ready for review March 11, 2023 22:29
Copy link
Member

@waynexia waynexia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good improvement 👍 But I have some questions about its correctness when the input dictionaries are not that normalized:

@jaylmiller
Copy link
Contributor Author

@waynexia thanks for correcting my assumption about how normalized dicts 😀. I've made changes correcting this. There is a bit of hashing required but significantly less than before since we only need to hash once for each value type (instead of hashing every cell).

Also since there was now alot of shared logic between the 2 accumulators, i've pulled that out into funcs so both accumulators can use it.

@alamb
Copy link
Contributor

alamb commented Mar 13, 2023

I plan to review this PR tomorrow. Thank you @waynexia for the review

@comphead
Copy link
Contributor

@jaylmiller thanks for the PR. Would be great to get some knowloedge how the much performance increased?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jaylmiller -- this looks great. Thank you @waynexia for the initial review and ensuring that the dictionaries are handled correctle.

I double checked at the logic now appears to handle different dictionaries correctly -- could you give it one more review @waynexia ?

Also, I wonder if you have had a chance to do any sort of benchmarking to show the improvement?

datafusion/physical-expr/src/aggregate/count_distinct.rs Outdated Show resolved Hide resolved
@@ -31,7 +32,7 @@ use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Accumulator;

type DistinctScalarValues = ScalarValue;

type ValueSet = HashSet<DistinctScalarValues, RandomState>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what value these type aliases add. The extra indirection of DistinctScalarValues --> ScalarValue simply seems to make things more complicated 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a bit confused about the purpose of DistinctScalarValues as well to be honest but I kind of figured it was there for a good reason so I left it in 😅

In terms of the added ValueSet alias, I personally thought it made the code a bit more readable but that is kind of subjective of course.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe we remove DistinctScalarValues alias but keep ValueSet?

datafusion/physical-expr/src/aggregate/count_distinct.rs Outdated Show resolved Hide resolved
@waynexia
Copy link
Member

Sorry for the delay, I plan to review it tomorrow!

jaylmiller and others added 2 commits March 14, 2023 11:42
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
@jaylmiller
Copy link
Contributor Author

Also, I wonder if you have had a chance to do any sort of benchmarking to show the improvement?

Currently looking into this. Will post findings today

@jackwener
Copy link
Member

jackwener commented Mar 14, 2023

A great job. I also notice this performance(I notice it by ClickBeach). Thanks for your job @jaylmiller .
I prepare to review it tomorrow.

Also, I wonder if you have had a chance to do any sort of benchmarking to show the improvement?

Currently looking into this. Will post findings today

I think some case in clickbench will be improved.

@jackwener jackwener requested review from jackwener and removed request for waynexia March 14, 2023 16:27
@jaylmiller
Copy link
Contributor Author

I think some case in clickbench will be improved.

Ok I'll look into getting some results on these cases

@jaylmiller
Copy link
Contributor Author

jaylmiller commented Mar 14, 2023

ClickBench count distinct query when using dictionary columns is getting killed (this is on main as well as the PR) 🤔

❯ CREATE EXTERNAL TABLE hits_base
STORED AS PARQUET
LOCATION 'hits.parquet';
0 rows in set. Query took 0.041 seconds.
❯ CREATE TABLE hits as
select
  arrow_cast("UserID", 'Dictionary(Int32, Utf8)') as "UserID"
FROM hits_base;

0 rows in set. Query took 13.887 seconds.
❯ SELECT COUNT(DISTINCT "UserID") from hits;
Killed

"UserID" table is pretty high cardinality though: is there a better clickbench query/column pair to bench with?

@jackwener
Copy link
Member

cc @sundy-li

@sundy-li
Copy link
Contributor

sundy-li commented Mar 15, 2023

type ValueSet = HashSet<DistinctScalarValues, RandomState>;

For numeric/string args in distinct, I think it's better to have special states rather than putting the enum into the HashSet.

SELECT COUNT(DISTINCT "UserID") from hits;

Another approach is to rewrite this SQL to select count() from (select userid from hits group by userid).

arrow_cast("UserID", 'Dictionary(Int32, Utf8)') as "UserID"

Cast could be overhead, if UserID is already Utf8 array, we just need to siphash it to u128, it's safe in a cryptographic way.

Copy link
Member

@waynexia waynexia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation generally looks good to me. I left some little suggestions about style. Looking forward to the bench result 🚀

Comment on lines +130 to +131
return Err(DataFusionError::Internal(
"Dict key has invalid datatype".to_string(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would prefer to add the concrete type in the error message

Comment on lines +148 to +151
// calculating the size of values hashset for fixed length values,
// taking first batch size * number of batches.
// This method is faster than full_size(), however it is not suitable for variable length
// values like strings or complex types
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// calculating the size of values hashset for fixed length values,
// taking first batch size * number of batches.
// This method is faster than full_size(), however it is not suitable for variable length
// values like strings or complex types
/// calculating the size of values hashset for fixed length values,
/// taking first batch size * number of batches.
/// This method is faster than full_size(), however it is not suitable for variable length
/// values like strings or complex types

style: prefer to use document comments

Comment on lines +159 to +160
}
// calculates the size as accurate as possible, call to this method is expensive
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
// calculates the size as accurate as possible, call to this method is expensive
}
// calculates the size as accurate as possible, call to this method is expensive

style: add empty line between two fns

Comment on lines +260 to +261
}
impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
}
impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>

style: add empty line between two blocks

@jaylmiller
Copy link
Contributor Author

jaylmiller commented Mar 16, 2023

I'm not seeing any noticeable ClickBench improvements when changing "RegionID" column to use dict encoding (setting other columns e.g. UserID to use dict encoding cause my datafusion process to be killed...)

Maybe this change is not worth merging if ClickBench improvements aren't being seen

@mingmwang
Copy link
Contributor

mingmwang commented Mar 16, 2023

@jaylmiller
How do you do the benchmark test ? I do not get a chance to take a closer look at this PR.
There is one logical optimization rule SingleDistinctToGroupBy in DataFusion which will rewrite the single distinct aggregate to normal group by aggregate and replace the distinct aggregator to normal count aggregator.

This optimization make sense in most cases but might have conflict with this optimization in this PR.

@jaylmiller
Copy link
Contributor Author

I am essentially just running clickbench queries against my PR and against main. The only change is I am setting RegionID column to a dict:

CREATE EXTERNAL TABLE hits_base
STORED AS PARQUET
LOCATION 'hits.parquet';
CREATE TABLE hits as
select 
  arrow_cast("RegionID", 'Dictionary(Int32, Utf8)') as "RegionID"
....

Do you have any recommendations, @mingmwang ?

@mingmwang
Copy link
Contributor

I am essentially just running clickbench queries against my PR and against main. The only change is I am setting RegionID column to a dict:

CREATE EXTERNAL TABLE hits_base
STORED AS PARQUET
LOCATION 'hits.parquet';
CREATE TABLE hits as
select 
  arrow_cast("RegionID", 'Dictionary(Int32, Utf8)') as "RegionID"
....

Do you have any recommendations, @mingmwang ?

I had never check clickbench queries. Maybe you can comment the rule SingleDistinctToGroupBy and run the benchmark again and see whether there are improvements.

@jaylmiller
Copy link
Contributor Author

Thanks I'll try that out

@alamb
Copy link
Contributor

alamb commented Mar 16, 2023

ClickBench count distinct query when using dictionary columns is getting killed (this is on main as well as the PR) 🤔

I wonder if we can try a smaller subset 🤔

❯ CREATE TABLE hits as select
  arrow_cast("UserID", 'Dictionary(Int32, Utf8)') as "UserID"
FROM 'hits.parquet'
limit 10000000;
0 rows in set. Query took 0.776 seconds.
❯ select count(distinct "UserID") from hits;
+-----------------------------+
| COUNT(DISTINCT hits.UserID) |
+-----------------------------+
| 1530334                     |
+-----------------------------+
1 row in set. Query took 71.388 seconds.

I will try this on my benchmark machine

@mingmwang
Copy link
Contributor

Any luck?
I think the rule SingleDistinctToGroupBy has conflicts with the optimization and rewriting Distinct to Group By is not always beneficial. Maybe we should add a configuration to turn on/off this rewriting.

let arr = as_dictionary_array::<K>(&values[0])?;
let nvalues = arr.values().len();
// map keys to whether their corresponding value has been seen or not
let mut seen_map = vec![false; nvalues];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seen_map could become a bitmap to save space.

High-cardinality inputs relative to the batch size (like UserID in clickbench) probably don't benefit that much from this map. I don't know the standard batch size that datafusion uses for that query, but a much larger batch size could improve the performance in this case.

Copy link
Contributor Author

@jaylmiller jaylmiller Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for suggestion. Just to clarify, you mean using arrow bitmap, correct? Something like

let mut seen_map = arrow::array::BooleanBufferBuilder::new(nvalues);

@jaylmiller
Copy link
Contributor Author

jaylmiller commented Mar 20, 2023

Any luck? I think the rule SingleDistinctToGroupBy has conflicts with the optimization and rewriting Distinct to Group By is not always beneficial. Maybe we should add a configuration to turn on/off this rewriting.

@mingmwang Sorry for delay. I haven't had a chance to get back to this PR yet (currently working on #5292).

@alamb
Copy link
Contributor

alamb commented Mar 28, 2023

Marking as draft to signify this PR has feedback and is not waiting for another review at the moment.

@alamb alamb marked this pull request as draft March 28, 2023 20:27
@alamb
Copy link
Contributor

alamb commented Apr 8, 2024

Since this has been open for more than a year, closing it down. Feel free to reopen if/when you keep working on it.

@alamb alamb closed this Apr 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Improve performance of COUNT (distinct x) for dictionary columns
8 participants