Skip to content

Conversation

@beliefer
Copy link
Contributor

What changes were proposed in this pull request?

This PR is related to #26656.
#26656 only support use FILTER clause on aggregate expression without DISTINCT.
This PR will enhance this feature when one or more DISTINCT aggregate expressions which allows the use of the FILTER clause.
Such as:

select sum(distinct id) filter (where sex = 'man') from student;
select class_id, sum(distinct id) filter (where sex = 'man') from student group by class_id;
select count(id) filter (where class_id = 1), sum(distinct id) filter (where sex = 'man') from student;
select class_id, count(id) filter (where class_id = 1), sum(distinct id) filter (where sex = 'man') from student group by class_id;
select sum(distinct id), sum(distinct id) filter (where sex = 'man') from student;
select class_id, sum(distinct id), sum(distinct id) filter (where sex = 'man') from student group by class_id;
select class_id, count(id), count(id) filter (where class_id = 1), sum(distinct id), sum(distinct id) filter (where sex = 'man') from student group by class_id;

Why are the changes needed?

Spark SQL only support use FILTER clause on aggregate expression without DISTINCT.
This PR support Filter expression allows simultaneous use of DISTINCT

Does this PR introduce any user-facing change?

Yes

How was this patch tested?

Exists and new UT

@SparkQA
Copy link

SparkQA commented Jul 29, 2020

Test build #126778 has finished for PR 29291 at commit 4ba808b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 30, 2020

Test build #126797 has finished for PR 29291 at commit 145a9dd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@beliefer
Copy link
Contributor Author

cc @cloud-fan

// group without filter clause.
// This check can produce false-positives, e.g., SUM(DISTINCT a) & COUNT(DISTINCT a).
distinctAggs.size > 1
distinctAggs.size > 1 || (distinctAggs.size == 1 && aggExpressions.exists(_.filter.isDefined))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove distinctAggs.size == 1, as it's indicarted by distinctAggs.size > 1 || ...

Copy link
Contributor Author

@beliefer beliefer Jul 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If distinctAggs.size == 0 and aggExpressions.exists(_.filter.isDefined), we not need this rewrite.
The normal agg with filter could treated by physical plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it be distinctAggs.exists(_.filter.isDefined)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I got it.

// Setup all the filters in distinct aggregate.
val distinctAggExprs = aggExpressions
.filter(e => e.isDistinct && e.children.exists(!_.foldable))
val distinctAggFilterAttrMap = distinctAggExprs.collect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: val (distinctAggFilters, distinctAggFilterAttrs, maxCond) = distinctAggExprs.collect(...).unzip3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I want

val distinctAggFilterAttrLookup = distinctAggFilterAttrMap.map { tuple3 =>
        tuple3._1 -> tuple3._3.toAttribute
      }.toMap

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is distinctAggFilters.zip(maxCond.map(_.toAttribute)).toMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@SparkQA
Copy link

SparkQA commented Jul 30, 2020

Test build #126810 has finished for PR 29291 at commit 7362dfb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@beliefer
Copy link
Contributor Author

retest this please

@cloud-fan
Copy link
Contributor

can you rebase/merge with the master branch to get the github action fix? The jenkin is quite unstable now and we may need to rely on github actions

@beliefer
Copy link
Contributor Author

can you rebase/merge with the master branch to get the github action fix? The jenkin is quite unstable now and we may need to rely on github actions

OK

@SparkQA
Copy link

SparkQA commented Jul 31, 2020

Test build #126843 has finished for PR 29291 at commit 7362dfb.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

val (distinctAggFilters, distinctAggFilterAttrs, maxConds) = distinctAggExprs.collect {
case AggregateExpression(_, _, _, filter, _) if filter.isDefined =>
val (e, attr) = expressionAttributePair(filter.get)
val aggregateExp = AggregateExpression(Max(attr), Partial, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Max(attr).toAggregateExpression(distinct = false)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good!

SELECT COUNT(DISTINCT id), COUNT(DISTINCT id) FILTER (WHERE date_format(hiredate, "yyyy-MM-dd HH:mm:ss") = "2001-01-01 00:00:00") FROM emp;
SELECT COUNT(DISTINCT id) FILTER (WHERE hiredate = to_timestamp("2001-01-01 00:00:00")), COUNT(DISTINCT id) FILTER (WHERE hiredate = to_date('2001-01-01 00:00:00')) FROM emp;
SELECT SUM(salary), COUNT(DISTINCT id), COUNT(DISTINCT id) FILTER (WHERE hiredate = date "2001-01-01") FROM emp;
SELECT COUNT(DISTINCT 1) FILTER (WHERE a = 1) FROM testData;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also test COUNT(DISTINCT id) FILTER (WHERE true) and COUNT(DISTINCT id) FILTER (WHERE false)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@SparkQA
Copy link

SparkQA commented Jul 31, 2020

Test build #126857 has finished for PR 29291 at commit fbb051b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 31, 2020

Test build #126866 has finished for PR 29291 at commit 9939ea7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 31, 2020

Test build #126886 has finished for PR 29291 at commit abafc20.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 3, 2020

Test build #126953 has finished for PR 29291 at commit 39583dd.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 3, 2020

Test build #126956 has finished for PR 29291 at commit 883973b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Aug 3, 2020

Test build #126968 has finished for PR 29291 at commit 883973b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Aug 3, 2020

Test build #126980 has finished for PR 29291 at commit 883973b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Aug 3, 2020

Test build #126994 has finished for PR 29291 at commit 883973b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 1597d8f Aug 4, 2020
@beliefer
Copy link
Contributor Author

beliefer commented Aug 4, 2020

@cloud-fan Thanks for your review and good idea.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants