Skip to content

Conversation

@zhengruifeng
Copy link
Contributor

What changes were proposed in this pull request?

when numSlices is avaiable, logical.Range should compute a exact maxRowsPerPartition

Why are the changes needed?

maxRowsPerPartition is used in optimizer, we should provide an exact value if possible

Does this PR introduce any user-facing change?

No

How was this patch tested?

existing testsuites

init
@github-actions github-actions bot added the SQL label Apr 26, 2021
@SparkQA
Copy link

SparkQA commented Apr 26, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42476/

@SparkQA
Copy link

SparkQA commented Apr 26, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42476/

@SparkQA
Copy link

SparkQA commented Apr 26, 2021

Test build #137955 has finished for PR 32350 at commit f82b0ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

cc @wangyum FYI

@wangyum
Copy link
Member

wangyum commented Apr 27, 2021

existing testsuites

Do we have tests to cover this change?

@zhengruifeng
Copy link
Contributor Author

@wangyum No, there is no test to cover maxRowsPerPartition. There is some test in CombiningLimitsSuite to check maxRows, should I add some test there?

@maropu
Copy link
Member

maropu commented Apr 28, 2021

Yea, please add tests in CombiningLimitsSuite. The fix itself looks fine.

@zhengruifeng
Copy link
Contributor Author

To add a similar test in CombiningLimitsSuite, some additional changes are involved. I'm not sure whether to switch to a simple test like:

scala> spark.range(0, 100, 1, 3).rdd.mapPartitions(iter => Iterator(iter.size)).max == 34
res0: Boolean = true      

extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
override def maxRows: Option[Long] = child.maxRows
override def maxRowsPerPartition: Option[Long] = child.maxRowsPerPartition
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this override is needed for the added test

@SparkQA
Copy link

SparkQA commented Apr 29, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42577/

@SparkQA
Copy link

SparkQA commented Apr 29, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42577/

@SparkQA
Copy link

SparkQA commented Apr 29, 2021

Test build #138058 has finished for PR 32350 at commit 0a69629.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 29, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42591/

@SparkQA
Copy link

SparkQA commented Apr 29, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42591/

@SparkQA
Copy link

SparkQA commented Apr 29, 2021

Test build #138071 has finished for PR 32350 at commit 30db1c2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the PR and the description. I feel what they say is different from what this PR looks like.

child
case GlobalLimit(l, child) if canEliminate(l, child) =>
child
case LocalLimit(l, child) if !plan.isStreaming && canEliminateLocalLimit(l, child) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not possible that a user's query reaches this optimization path now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a streaming case, maxRowsPerPartition can be filled? (we need the condition !plan.isStreaming here?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not possible that a user's query reaches this optimization path now?

end user's query should not reaches this path, I think. This path is only for adding a similar test in CombiningLimitsSuite

In a streaming case, maxRowsPerPartition can be filled? (we need the condition !plan.isStreaming here?)

org.apache.spark.sql.streaming.StreamSuite.SPARK-30657: streaming limit optimization from StreamingLocalLimitExec to LocalLimitExec fails if do not add this condition.


def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, logicalPlan)

def localLimit(limitExpr: Expression): LogicalPlan = LocalLimit(limitExpr, logicalPlan)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is used only once now, could you use LocalLimit directly in the test?

Comment on lines 154 to 158
checkPlanAndMaxRowsPerPartition(
Range(0, 100, 1, 3).select().localLimit(34),
Range(0, 100, 1, 3).select(),
34
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make the test more simple? For example:

assert(Range(0, 100, 1, 3).maxRowsPerPartition === Some(34))
assert(Range(0, 100, 1, 4).maxRowsPerPartition === Some(25))
assert(Range(0, 100, 1, 3).select('id).maxRowsPerPartition === Some(34))

@zhengruifeng
Copy link
Contributor Author

@wangyum @maropu Thanks for reviewing!
I think I made this PR too complex, and will follow @wangyum 's commment to use a simpler testsuite.

@SparkQA
Copy link

SparkQA commented May 6, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42714/

@SparkQA
Copy link

SparkQA commented May 6, 2021

Test build #138193 has finished for PR 32350 at commit a3e26c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @wangyum

@maropu maropu closed this in 620f072 May 9, 2021
@maropu
Copy link
Member

maropu commented May 9, 2021

Thank you, @zhengruifeng . Merged to master.

@zhengruifeng
Copy link
Contributor Author

Thank you so much!

@zhengruifeng zhengruifeng deleted the range_maxRowsPerPartition branch May 10, 2021 02:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants