Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor count_distinct to not to have update and merge #5408

Merged
merged 1 commit into from
Mar 3, 2023

Conversation

Weijun-H
Copy link
Member

Which issue does this PR close?

Parts #1598

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the physical-expr Physical Expressions label Feb 26, 2023
let arr = &values[0];
(0..arr.len()).try_for_each(|index| {
let scalar = ScalarValue::try_from_array(arr, index)?;
let scalar = vec![scalar];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can avoid creating a Vec here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @Dandandan, thank you for reviewing my work. I'm currently having difficulty finding a way to avoid using vec in this context. Could you please provide me with some guidance on how to refactor it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can modify DistinctScalarValues from
struct DistinctScalarValues(Vec<ScalarValue>)
to
struct DistinctScalarValues(ScalarValue)

and update the code accordingly.

This might provide some small performance increase.

let arr = &states[0];
(0..arr.len()).try_for_each(|index| {
let scalar = ScalarValue::try_from_array(arr, index)?;
let scalar = vec![scalar];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can rewrite the code to not create the Vec

let arr = &values[0];
(0..arr.len()).try_for_each(|index| {
let scalar = ScalarValue::try_from_array(arr, index)?;
if !ScalarValue::is_null(&scalar) {
Copy link
Contributor

@Dandandan Dandandan Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check can be done on the array arr already instead of on the scalar.

@Dandandan
Copy link
Contributor

@Weijun-H The tests are failing, could you take a look?

.collect::<Result<Vec<_>>>()?;
self.merge(&v)
})
}
fn state(&self) -> Result<Vec<ScalarValue>> {
let mut cols_out = self
.state_data_types
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state_data_types can be simplified to be of type DataType too instead of Vec<DataType>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and code to be updated accordingly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state_data_types can be simplified to be of type DataType too instead of Vec<DataType>

Do you mean state_data_types in DistinctCountAccumulator and DistinctCount?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that doesn't need to be a Vec, as it always use a single state column, this will simplify the code some more.

@Weijun-H
Copy link
Member Author

Weijun-H commented Feb 27, 2023

@Dandandan, I reviewed the failed test and identified the root cause of the issue. It appears that the problem occurred because there was a modification made to the structure of the DistinctScalarValues from DistinctScalarValues(Vec<ScalarValue>) to DistinctScalarValues(ScalarValue). This change caused the test to fail because the test was designed to work with two different types of data.

https://github.com/apache/arrow-datafusion/blob/16cb4c122f8ea110bc7adf425f4905fa06ed2c81/datafusion/physical-expr/src/aggregate/count_distinct.rs#L608-L642

@Dandandan
Copy link
Contributor

@Dandandan, I reviewed the failed test and identified the root cause of the issue. It appears that the problem occurred because there was a modification made to the structure of the DistinctScalarValues from DistinctScalarValues(Vec<ScalarValue>) to DistinctScalarValues(ScalarValue). This change caused the test to fail because the test was designed to work with two different types of data.

https://github.com/apache/arrow-datafusion/blob/16cb4c122f8ea110bc7adf425f4905fa06ed2c81/datafusion/physical-expr/src/aggregate/count_distinct.rs#L608-L642

Ok. I think we have to update (or remove) the tests to update the expectations that we no longer support multiple columns.

@@ -41,7 +41,7 @@ pub struct DistinctCount {
/// The DataType for the final count
data_type: DataType,
/// The DataType used to hold the state for each input
state_data_types: Vec<DataType>,
state_data_types: DataType,
/// The input arguments
exprs: Vec<Arc<dyn PhysicalExpr>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also update exprs: Vec<Arc<dyn PhysicalExpr>> to exprs: Arc<dyn PhysicalExpr> and DistinctCount::new to communicate that multiple columns are no longer supported.

Copy link
Member Author

@Weijun-H Weijun-H Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe input_data_types: Vec<DataType> also need to be changed to DataType, because it also need one DataType?

Copy link
Contributor

@Dandandan Dandandan Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, as well.
If you like to do some more refactoring, you could remove data_type: DataType from the DistinctCount struct as well and just keep a int64 in the DistinctCountAccumulator instead of using a ScalarValue + count_data_type .

@@ -113,106 +102,61 @@ impl AggregateExpr for DistinctCount {
#[derive(Debug)]
struct DistinctCountAccumulator {
values: HashSet<DistinctScalarValues, RandomState>,
state_data_types: Vec<DataType>,
state_data_types: DataType,
count_data_type: DataType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove this as well (it's basically unused by now)?

@Dandandan
Copy link
Contributor

TY @Weijun-H I think it's looking great!
Can you fix the conflict and look at my remaining comment?

@@ -31,35 +31,30 @@ use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Accumulator;

#[derive(Debug, PartialEq, Eq, Hash, Clone)]
struct DistinctScalarValues(Vec<ScalarValue>);
struct DistinctScalarValues(ScalarValue);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use HashSet<DistinctScalarValues, RandomState> instead and remove this struct.

/// The DataType used to hold the state for each input
state_data_types: Vec<DataType>,
state_data_types: DataType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
state_data_types: DataType,
state_data_type: DataType,

/// The input arguments
exprs: Vec<Arc<dyn PhysicalExpr>>,
exprs: Arc<dyn PhysicalExpr>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
exprs: Arc<dyn PhysicalExpr>,
expr: Arc<dyn PhysicalExpr>,

input_data_types: Vec<DataType>,
exprs: Vec<Arc<dyn PhysicalExpr>>,
input_data_types: DataType,
exprs: Arc<dyn PhysicalExpr>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
exprs: Arc<dyn PhysicalExpr>,
expr: Arc<dyn PhysicalExpr>,

}

impl DistinctCount {
/// Create a new COUNT(DISTINCT) aggregate function.
pub fn new(
input_data_types: Vec<DataType>,
exprs: Vec<Arc<dyn PhysicalExpr>>,
input_data_types: DataType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
input_data_types: DataType,
input_data_type: DataType,

@@ -113,43 +99,10 @@ impl AggregateExpr for DistinctCount {
#[derive(Debug)]
struct DistinctCountAccumulator {
values: HashSet<DistinctScalarValues, RandomState>,
state_data_types: Vec<DataType>,
count_data_type: DataType,
state_data_types: DataType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
state_data_types: DataType,
state_data_type: DataType,

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I think this is a great step forward 👍

@Weijun-H
Copy link
Member Author

Weijun-H commented Mar 1, 2023

Nice, I think this is a great step forward 👍

Thank you for your patient guidance.

@Dandandan
Copy link
Contributor

FYI @alamb this has some backwards-incompatible changes. I think it removes some unnecessary complexity, making it easier to improve performance later on.

@Dandandan
Copy link
Contributor

Merging this in 24 hours if no other comments

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me -- thank you @Weijun-H and @Dandandan

@Dandandan Dandandan merged commit d11820a into apache:main Mar 3, 2023
@ursabot
Copy link

ursabot commented Mar 3, 2023

Benchmark runs are scheduled for baseline = ddd64e7 and contender = d11820a. d11820a is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

}

fn size(&self) -> usize {
if self.count_data_type.is_primitive() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we lost the size support for variable sized values during this PR, is it expected? @alamb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was not intended -- if someone has time to fix it that would be great, otherwise I will try and get a PR up later today

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #5534

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants