Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
01e4cdf
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 13, 2015
6835704
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
9180687
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
b38a21e
SPARK-11633
gatorsmile Nov 17, 2015
d2b84af
Merge remote-tracking branch 'upstream/master' into joinMakeCopy
gatorsmile Nov 17, 2015
fda8025
Merge remote-tracking branch 'upstream/master'
gatorspark Nov 17, 2015
ac0dccd
Merge branch 'master' of https://github.com/gatorsmile/spark
gatorspark Nov 17, 2015
6e0018b
Merge remote-tracking branch 'upstream/master'
Nov 20, 2015
0546772
converge
gatorsmile Nov 20, 2015
b37a64f
converge
gatorsmile Nov 20, 2015
661260b
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 23, 2015
2dfa0fd
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 25, 2015
d929d9b
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 25, 2015
4070d2f
Merge remote-tracking branch 'upstream/master'
gatorsmile Dec 7, 2015
38dcfb2
Merge remote-tracking branch 'upstream/master'
gatorsmile Dec 9, 2015
cb3fc83
Merge remote-tracking branch 'upstream/master'
gatorsmile Dec 12, 2015
8dbacc7
Merge remote-tracking branch 'upstream/master'
gatorsmile Dec 18, 2015
41b9172
Merge remote-tracking branch 'upstream/master'
gatorsmile Dec 22, 2015
56fd782
union limit pushdown.
gatorsmile Dec 23, 2015
b5ac8d7
Merge remote-tracking branch 'upstream/master' into unionLimit
gatorsmile Dec 23, 2015
77105e3
combine the limits.
gatorsmile Dec 23, 2015
7f25d91
update the comments.
gatorsmile Dec 24, 2015
ae59f42
add a stop flag.
gatorsmile Dec 25, 2015
3ccf3bd
fixed the build failure.
gatorsmile Dec 25, 2015
004ed66
revert the changes back.
gatorsmile Dec 26, 2015
6998ec9
added limitedNumRows into the logicalPlan
gatorsmile Dec 28, 2015
09a5672
changed limitedNumRows to maxRows
gatorsmile Dec 28, 2015
358d62e
address the comments.
gatorsmile Dec 29, 2015
2823a57
addressed comments.
gatorsmile Dec 29, 2015
10d570c
Merge remote-tracking branch 'upstream/master'
gatorsmile Dec 29, 2015
cfbeea7
Merge branch 'unionLimit' into unionLimit2
gatorsmile Dec 29, 2015
ca5c104
addressed comments.
gatorsmile Dec 29, 2015
56f0c16
address the comments.
gatorsmile Dec 30, 2015
62d5cbe
update the comment.
gatorsmile Dec 30, 2015
7cf955f
update the comment.
gatorsmile Dec 30, 2015
7899312
Merge remote-tracking branch 'upstream/master' into unionLimit2
gatorsmile Dec 30, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
PushPredicateThroughProject,
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
PushDownLimit,
ColumnPruning,
// Operator combine
ProjectCollapsing,
Expand Down Expand Up @@ -79,6 +80,39 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
*/
object DefaultOptimizer extends Optimizer

/**
* Pushes down Limit for reducing the amount of returned data.
*
* 1. Adding Extra Limit beneath the operations, including Union All.
* 2. Project is pushed through Limit in the rule ColumnPruning
*
* Any operator that a Limit can be pushed passed should override the maxRows function.
*/
object PushDownLimit extends Rule[LogicalPlan] {

private def buildUnionChild (limitExp: Expression, plan: LogicalPlan): LogicalPlan = {
(limitExp, plan.maxRows) match {
case (IntegerLiteral(maxRow), Some(IntegerLiteral(childMaxRows))) if maxRow < childMaxRows =>
Limit(limitExp, plan)
case (_, None) =>
Limit(limitExp, plan)
case _ => plan
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {

// Adding extra Limit below UNION ALL iff both left and right childs are not Limit or
// do not have Limit descendants whose maxRow is larger. This heuristic is valid assuming
// there does not exist any Limit push-down rule that is unable to infer the value of maxRows.
// Note, right now, Union means UNION ALL, which does not de-duplicate rows. So, it is
// safe to pushdown Limit through it. Once we add UNION DISTINCT, we will not be able to
// pushdown Limit.
case Limit(exp, Union(left, right)) =>
Limit(exp, Union(buildUnionChild(exp, left), buildUnionChild(exp, right)))
}
}

/**
* Pushes operations down into a Sample.
*/
Expand All @@ -97,8 +131,8 @@ object SamplePushDown extends Rule[LogicalPlan] {
* Operations that are safe to pushdown are listed as follows.
* Union:
* Right now, Union means UNION ALL, which does not de-duplicate rows. So, it is
* safe to pushdown Filters and Projections through it. Once we add UNION DISTINCT,
* we will not be able to pushdown Projections.
* safe to pushdown Filters, Projections and Limits through it. Once we add UNION DISTINCT,
* we will not be able to pushdown Projections and Limits.
*
* Intersect:
* It is not safe to pushdown Projections through it because we need to get the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product)
}

/**
* Returns the limited number of rows to be returned.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specify that any operator that a Limit can be pushed passed should override this function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, thus, we should fix Project too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we will push down Project through Limit in ColumnPruning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we need to override the function in both directions. Let me update the comments. Thanks!

The value of maxRows is used for the Project's parent or ancestor nodes that could be another Limit.

*
* Any operator that a Limit can be pushed passed should override this function. (e.g., Union)
* Any operator that can push through a Limit should override this function. (e.g., Project)
*/
def maxRows: Option[Expression] = None

/**
* Returns true if this expression and all its children have been resolved to a specific schema
* and false if it still contains any unresolved placeholders. Implementations of LogicalPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import scala.collection.mutable.ArrayBuffer
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)

override def maxRows: Option[Expression] = child.maxRows

override lazy val resolved: Boolean = {
val hasSpecialExpressions = projectList.exists ( _.collect {
case agg: AggregateExpression => agg
Expand Down Expand Up @@ -109,6 +111,9 @@ private[sql] object SetOperation {

case class Union(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {

override def maxRows: Option[Expression] =
for (leftMax <- left.maxRows; rightMax <- right.maxRows) yield Add(leftMax, rightMax)

override def statistics: Statistics = {
val sizeInBytes = left.statistics.sizeInBytes + right.statistics.sizeInBytes
Statistics(sizeInBytes = sizeInBytes)
Expand Down Expand Up @@ -451,6 +456,8 @@ case class Pivot(
case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output

override def maxRows: Option[Expression] = Option(limitExpr)

override lazy val statistics: Statistics = {
val limit = limitExpr.eval().asInstanceOf[Int]
val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._

class PushdownLimitsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubQueries) ::
Batch("Push Down Limit", Once,
PushDownLimit,
CombineLimits,
ConstantFolding,
BooleanSimplification) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)

test("Union: limit to each side") {
val unionQuery = Union(testRelation, testRelation2).limit(1)
val unionOptimized = Optimize.execute(unionQuery.analyze)
val unionCorrectAnswer =
Limit(1, Union(testRelation.limit(1), testRelation2.limit(1))).analyze
comparePlans(unionOptimized, unionCorrectAnswer)
}

test("Union: limit to each side with the new limit number") {
val testLimitUnion = Union(testRelation, testRelation2.limit(3))
val unionQuery = testLimitUnion.limit(1)
val unionOptimized = Optimize.execute(unionQuery.analyze)
val unionCorrectAnswer =
Limit(1, Union(testRelation.limit(1), testRelation2.limit(1))).analyze
comparePlans(unionOptimized, unionCorrectAnswer)
}

test("Union: no limit to both sides if children having smaller limit values") {
val testLimitUnion = Union(testRelation.limit(1), testRelation2.select('d).limit(1))
val unionQuery = testLimitUnion.limit(2)
val unionOptimized = Optimize.execute(unionQuery.analyze)
val unionCorrectAnswer =
Limit(2, Union(testRelation.limit(1), testRelation2.select('d).limit(1))).analyze
comparePlans(unionOptimized, unionCorrectAnswer)
}

test("Union: limit to each sides if children having larger limit values") {
val testLimitUnion = Union(testRelation.limit(3), testRelation2.select('d).limit(4))
val unionQuery = testLimitUnion.limit(2)
val unionOptimized = Optimize.execute(unionQuery.analyze)
val unionCorrectAnswer =
Limit(2, Union(testRelation.limit(2), testRelation2.select('d).limit(2))).analyze
comparePlans(unionOptimized, unionCorrectAnswer)
}
}