-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18089][SQL] Remove shuffle codes in CollectLimitExec #15596
Conversation
@@ -39,7 +39,7 @@ | |||
protected int partitionIndex = -1; | |||
|
|||
public boolean hasNext() throws IOException { | |||
if (currentRows.isEmpty()) { | |||
if (!shouldStop()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldStop
in whole stage codegen can be override to have custom stop condition other than just currentRows.isEmpty()
.
For example, in limit, we will stop iterator processing early if limitation is met.
Without this change, this pr will fail one test in SQLQuerySuite: "SPARK-17515: CollectLimit.execute() should perform per-partition limits". Because the LocalLimit
will not stop immediately but in next round after reaching limitation.
@@ -83,9 +83,8 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { | |||
s""" | |||
| if ($countTerm < $limit) { | |||
| $countTerm += 1; | |||
| if ($countTerm == $limit) $stopEarly = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set stop early flag to true once reaching the limit, so we won't step into next element in iterator. Otherwise, we still go to get next element from the iterator. So if the limitation is 1, we will get 2 elements from the iterator.
Note: This won't cause real problem because we have an if
to guard against processing element. But it fails the test in SQLQuerySuite due to it uses accumulator to count the elements.
BTW, we can see if there is an exchange added for
|
Test build #67371 has finished for PR 15596 at commit
|
protected override def doExecute(): RDD[InternalRow] = { | ||
val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are removing an optimization here right? We can greatly reduce the number of shuffled records by applying the limit before anything gets shuffled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use a LocalLimit
to do this optimization. I think we should use existing physical plans as much as possible, instead of rdd manipulation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One advantage is LocalLimit
supports whole stage codegen. We can also easily get the idea of this optimization from physical plans.
@JoshRosen could you shed some light on why we are not using the regular |
Test build #67427 has finished for PR 15596 at commit
|
6d7095c
to
e8520cf
Compare
e8520cf
to
76a3eaf
Compare
@@ -661,6 +661,15 @@ case class HashAggregateExec( | |||
""".stripMargin | |||
} | |||
|
|||
ctx.addNewFunction("releaseResource", s""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do need to release memory by calling these functions. Because we stop iterating the data early by letting shouldStop
return true, we won't call next processNext
and memory leak will happen. So we wrap these calls in override releaseResource
function.
@@ -47,6 +50,22 @@ case class LocalTableScanExec( | |||
|
|||
private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows, numParallelism) | |||
|
|||
protected override def doProduce(ctx: CodegenContext): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let LocalTableScanExec
support whole stage codegen.
Because CollectLimitExec
now supports whole stage codegen, the test in SQLMetricsSuite
:
val df2 = spark.createDataset(Seq(1, 2, 3)).limit(2)
df2.collect()
val metrics2 = df2.queryExecution.executedPlan.collectLeaves().head.metrics
assert(metrics2.contains("numOutputRows"))
assert(metrics2("numOutputRows").value === 2)
will execute the LocalTableScanExec
node to get its RDD. Then an InputAdapter will connect it to CollectLimitExec
's whole stage codegen node. So it will output all 3 rows in the local table.
Adding this whole stage code support seems straightforward. So I adds it here to pass the tests.
hmmm, I don't image that it is needed for such related changes to remove shuffle codes in CollectLimitExec. The main change is to modify whole stage codegen of Limit. There is a place in it needed to change in order to exactly iterate the data in the limited number. I left the comments in above. This is not a serious issue and if you think this change is too big for the purpose, please let me know. Thanks! |
Test build #67452 has finished for PR 15596 at commit
|
For a bit of background on In typical operation, we don't necessarily expect |
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) | ||
override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil | ||
override def executeCollect(): Array[InternalRow] = { | ||
child.collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that I understand the intent here: it looks like the idea is to let EnsureRequirements
handle the shuffling, if necessary, then to strip off the shuffling and walk backwards in the DAG to optimize the collect()
case. This is pretty clever.
One concern of mine, though, is that there seems to be an implicit assumption on the types of children that this operator can have and I think it would be a good idea to write them down and pattern-match more explicitly. For example, I'm not sure that this strategy is safe in case the child isn't an exchange because then you run the risk of simply dropping any operators that occur post-exchange. I understand that this case can't crop up in the types of plans that we currently generate but it would be good to future-proof by matching explicitly rather than having the less-constrained collect
behavior here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, one other idea: if you're going to try this strategy, what do you think about putting this logic into GlobalLimitExec
and removing the CollectLimitExec
planning logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems a good idea. I once thought about the difference between GlobalLimitExec
and CollectLimitExec
during refactoring this. But decided not to change too much. Looks like I could remove CollectLimitExec
.
retest this please. |
Yeah, I think I understand the intent. So in the |
Test build #67480 has finished for PR 15596 at commit
|
override def executeCollect(): Array[InternalRow] = child match { | ||
// This happens when the user is collecting results back to the driver, we could skip | ||
// the shuffling and scan increasingly the RDD to get the limited items. | ||
case g: GlobalLimitExec => g.executeCollect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is really confusing ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm, I am polishing the comment, hope it helpful. Would like to hear any suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still think this is confusing as you said. Removed it.
Test build #67492 has finished for PR 15596 at commit
|
Test build #67498 has finished for PR 15596 at commit
|
Test build #67501 has finished for PR 15596 at commit
|
53a956e
to
e919f4a
Compare
Test build #67557 has finished for PR 15596 at commit
|
Test build #67556 has finished for PR 15596 at commit
|
e919f4a
to
360752c
Compare
360752c
to
86b4e42
Compare
Test build #67569 has finished for PR 15596 at commit
|
Test build #67571 has finished for PR 15596 at commit
|
* Helper trait which defines methods that are shared by both | ||
* [[LocalLimitExec]] and [[GlobalLimitExec]]. | ||
*/ | ||
trait BaseLimitExec extends UnaryExecNode with CodegenSupport { | ||
val limit: Int | ||
override def output: Seq[Attribute] = child.output | ||
override def executeCollect(): Array[InternalRow] = child.executeTake(limit) | ||
override def executeTake(n: Int): Array[InternalRow] = child.executeTake(limit) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will want to add in an executeToIterator override as well to make sure we don't cause a shuffle there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pwoody! Agreed. But I am thinking not to replace CollectLimitExec
with GlobalLimitExec
. The reason is commented below. Let's wait for @JoshRosen's response. If we decide to keep CollectLimitExec
, your change at #15614 can be applied then.
@JoshRosen After few tries, I think to replace The main reason is whole stage codegen. Since
Of course we can change the tests to fit it. But I don't think it is necessary and good way to do. Another workaround is to override So based on such facts, I think we better keep What do you think? |
492106f
to
89c0c62
Compare
Test build #67617 has finished for PR 15596 at commit
|
Test build #67618 has finished for PR 15596 at commit
|
I'd close this now. |
What changes were proposed in this pull request?
Currently,
CollectLimitExec
is an operator used when the logical Limit is the last operator in a logical plan. In fact, the job ofCollectLimitExec
is not different toGlobalLimitExec
. We can do little refactoring toGlobalLimitExec
and replaceCollectLimitExec
.How was this patch tested?
Jenkins tests.
Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.