From 18af774ee3fb15fb3694e3454640dc145d908d5b Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 30 Nov 2015 16:27:48 -0800 Subject: [PATCH 01/10] reorder inner joins --- .../sql/catalyst/optimizer/Optimizer.scala | 51 +++++++++++++++++-- .../optimizer/FilterPushdownSuite.scala | 21 ++++++++ 2 files changed, 67 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 06d14fcf8b9c..4ef613e1f0a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -18,14 +18,11 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.immutable.HashSet + import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.plans.Inner -import org.apache.spark.sql.catalyst.plans.FullOuter -import org.apache.spark.sql.catalyst.plans.LeftOuter -import org.apache.spark.sql.catalyst.plans.RightOuter -import org.apache.spark.sql.catalyst.plans.LeftSemi +import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types._ @@ -44,6 +41,7 @@ object DefaultOptimizer extends Optimizer { // Operator push down SetOperationPushDown, SamplePushDown, + ReorderInnerJoin, PushPredicateThroughJoin, PushPredicateThroughProject, PushPredicateThroughGenerate, @@ -711,6 +709,49 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHel } } +/** + * Reorder the inner joins so that the bottom ones have at least one condition. + */ +object ReorderInnerJoin extends Rule[LogicalPlan] with PredicateHelper { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + + // This will only be matched once, because Filter will be pushed down to join by + // PushPredicateThroughJoin + case f @ Filter(filterCondition, j @ Join(left, right, Inner, None)) => + + // flatten all inner joins, which are next to each other and has no condition + def flattenJoin(plan: LogicalPlan): (LogicalPlan, Seq[LogicalPlan]) = plan match { + case Join(left, right, Inner, None) => + val (root, joins) = flattenJoin(left) + (root, joins ++ Seq(right)) + case _ => (plan, Seq()) + } + + val allConditions = splitConjunctivePredicates(filterCondition) + var (joined, toJoins) = flattenJoin(j) + // filter out the conditions that could be pushed down to `joined` + var otherConditions = allConditions.filterNot { cond => + cond.references.subsetOf(joined.outputSet) + } + while (toJoins.nonEmpty && otherConditions.nonEmpty) { + // find out the first join that have at least one condition for it + val conditionalJoin = toJoins.find { plan => + val refs = joined.outputSet ++ plan.outputSet + otherConditions.exists(cond => cond.references.subsetOf(refs)) + } + assert(conditionalJoin.isDefined) + val picked = conditionalJoin.get + joined = Join(joined, picked, Inner, None) + toJoins = toJoins.filterNot(_ eq picked) + otherConditions = allConditions.filterNot { cond => + cond.references.subsetOf(joined.outputSet) + } + } + joined = (Seq(joined) ++ toJoins).reduceLeft(Join(_, _, Inner, None)) + Filter(filterCondition, joined) + } +} + /** * Pushes down [[Filter]] operators where the `condition` can be * evaluated using only the attributes of the left or right side of a join. Other diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index fba4c5ca77d6..37b2ac20f465 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -38,6 +38,7 @@ class FilterPushdownSuite extends PlanTest { CombineFilters, PushPredicateThroughProject, BooleanSimplification, + ReorderInnerJoin, PushPredicateThroughJoin, PushPredicateThroughGenerate, PushPredicateThroughAggregate, @@ -548,6 +549,25 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) } + test("joins: reorder inner joins") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + val z = testRelation.subquery('z) + + val originalQuery = { + x.join(y).join(z) + .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + x.join(z, condition = Some("x.b".attr === "z.b".attr)) + .join(y, condition = Some("y.d".attr === "z.a".attr)) + .analyze + + comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) + } + val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType)) test("generate: predicate referenced no generated column") { @@ -750,4 +770,5 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + } From 1a32fea642a905f88a463f0f0ad8c540cd32202b Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 1 Dec 2015 11:58:10 -0800 Subject: [PATCH 02/10] refactor --- .../sql/catalyst/optimizer/Optimizer.scala | 65 +++++++++---------- .../sql/catalyst/planning/patterns.scala | 33 +++++++++- 2 files changed, 64 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 4ef613e1f0a1..2ada0ef92bc4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -22,6 +22,7 @@ import scala.collection.immutable.HashSet import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.planning.FilterAndInnerJoins import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -711,44 +712,42 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHel /** * Reorder the inner joins so that the bottom ones have at least one condition. + * + * TODO: support outer joins */ object ReorderInnerJoin extends Rule[LogicalPlan] with PredicateHelper { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - - // This will only be matched once, because Filter will be pushed down to join by - // PushPredicateThroughJoin - case f @ Filter(filterCondition, j @ Join(left, right, Inner, None)) => - // flatten all inner joins, which are next to each other and has no condition - def flattenJoin(plan: LogicalPlan): (LogicalPlan, Seq[LogicalPlan]) = plan match { - case Join(left, right, Inner, None) => - val (root, joins) = flattenJoin(left) - (root, joins ++ Seq(right)) - case _ => (plan, Seq()) + def reorder( + input: LogicalPlan, + joins: Seq[LogicalPlan], + conditions: Seq[Expression]): LogicalPlan = { + // filter out the conditions that could be pushed down to `joined` + val otherConditions = conditions.filterNot { cond => + cond.references.subsetOf(input.outputSet) + } + if (joins.isEmpty) { + input + } else if (otherConditions.isEmpty) { + // no condition for these joins, so put them in original order + (Seq(input) ++ joins).reduceLeft(Join(_, _, Inner, None)) + } else { + // find out the first join that have at least one condition + val conditionalJoin = joins.find { plan => + val refs = input.outputSet ++ plan.outputSet + otherConditions.exists(cond => cond.references.subsetOf(refs)) } + assert(conditionalJoin.isDefined) + val picked = conditionalJoin.get + val joined = Join(input, picked, Inner, None) + reorder(joined, joins.filterNot(_ eq picked), otherConditions) + } + } - val allConditions = splitConjunctivePredicates(filterCondition) - var (joined, toJoins) = flattenJoin(j) - // filter out the conditions that could be pushed down to `joined` - var otherConditions = allConditions.filterNot { cond => - cond.references.subsetOf(joined.outputSet) - } - while (toJoins.nonEmpty && otherConditions.nonEmpty) { - // find out the first join that have at least one condition for it - val conditionalJoin = toJoins.find { plan => - val refs = joined.outputSet ++ plan.outputSet - otherConditions.exists(cond => cond.references.subsetOf(refs)) - } - assert(conditionalJoin.isDefined) - val picked = conditionalJoin.get - joined = Join(joined, picked, Inner, None) - toJoins = toJoins.filterNot(_ eq picked) - otherConditions = allConditions.filterNot { cond => - cond.references.subsetOf(joined.outputSet) - } - } - joined = (Seq(joined) ++ toJoins).reduceLeft(Join(_, _, Inner, None)) - Filter(filterCondition, joined) + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case FilterAndInnerJoins(input, joins, filterConditions) if joins.size > 1 => + assert(filterConditions.nonEmpty) + val joined = reorder(input, joins, filterConditions) + Filter(filterConditions.reduceLeft(And), joined) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 6f4f11406d7c..932a6f525457 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -21,7 +21,6 @@ import org.apache.spark.Logging import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.trees.TreeNodeRef /** * A pattern that matches any number of project or filter operations on top of another relational @@ -132,6 +131,38 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { } } +/** + * A pattern that collects the filter and inner joins. + * + * Filter + * | + * inner Join + * / \ ----> (filters, Seq(plan1, plan2), input) + * inner join plan2 + * / \ + * input plan1 + */ +object FilterAndInnerJoins extends PredicateHelper { + def unapply(plan: LogicalPlan): Option[(LogicalPlan, Seq[LogicalPlan], Seq[Expression])] = + plan match { + case f @ Filter(filterCondition, j @ Join(left, right, Inner, None)) => + + // flatten all inner joins, which are next to each other and has no condition + def flattenJoin(plan: LogicalPlan): (LogicalPlan, Seq[LogicalPlan]) = plan match { + case Join(left, right, Inner, None) => + val (input, joins) = flattenJoin(left) + (input, joins ++ Seq(right)) + case _ => (plan, Seq()) + } + + val allConditions = splitConjunctivePredicates(filterCondition) + val (input, joins) = flattenJoin(j) + Some((input, joins, allConditions)) + + case _ => None + } +} + /** * A pattern that collects all adjacent unions and returns their children as a Seq. */ From 3a62c3264e4b39fb9c5b0a3fb772f7e0c117024a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 1 Dec 2015 13:14:21 -0800 Subject: [PATCH 03/10] improve comments --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 12 +++++++----- .../sql/catalyst/optimizer/FilterPushdownSuite.scala | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2ada0ef92bc4..593eff9e67bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -42,7 +42,7 @@ object DefaultOptimizer extends Optimizer { // Operator push down SetOperationPushDown, SamplePushDown, - ReorderInnerJoin, + ReorderJoin, PushPredicateThroughJoin, PushPredicateThroughProject, PushPredicateThroughGenerate, @@ -711,12 +711,13 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHel } /** - * Reorder the inner joins so that the bottom ones have at least one condition. - * - * TODO: support outer joins + * Reorder the joins so that the bottom ones have at least one condition. */ -object ReorderInnerJoin extends Rule[LogicalPlan] with PredicateHelper { +object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { + /** + * Reorder the joins so that the bottom ones have at least one condition. + */ def reorder( input: LogicalPlan, joins: Seq[LogicalPlan], @@ -744,6 +745,7 @@ object ReorderInnerJoin extends Rule[LogicalPlan] with PredicateHelper { } def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // TODO: support outer join case FilterAndInnerJoins(input, joins, filterConditions) if joins.size > 1 => assert(filterConditions.nonEmpty) val joined = reorder(input, joins, filterConditions) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 37b2ac20f465..2781f26454d5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -38,7 +38,7 @@ class FilterPushdownSuite extends PlanTest { CombineFilters, PushPredicateThroughProject, BooleanSimplification, - ReorderInnerJoin, + ReorderJoin, PushPredicateThroughJoin, PushPredicateThroughGenerate, PushPredicateThroughAggregate, From babe395b1a4684b4cbd39148515bd1332ef350bf Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 1 Dec 2015 15:06:09 -0800 Subject: [PATCH 04/10] make it more general --- .../sql/catalyst/optimizer/Optimizer.scala | 57 ++++++++++--------- .../sql/catalyst/planning/patterns.scala | 40 +++++++------ 2 files changed, 53 insertions(+), 44 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 593eff9e67bb..aa154ad30900 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -22,7 +22,7 @@ import scala.collection.immutable.HashSet import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.planning.FilterAndInnerJoins +import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -716,40 +716,43 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHel object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { /** - * Reorder the joins so that the bottom ones have at least one condition. + * Join a list of plans together and push down the conditions into them. + * + * The joined plan are picked from left to right, prefer those has at least one join condition. + * + * @param input a list of LogicalPlans to join. + * @param conditions a list of condition for join. */ - def reorder( - input: LogicalPlan, - joins: Seq[LogicalPlan], - conditions: Seq[Expression]): LogicalPlan = { - // filter out the conditions that could be pushed down to `joined` - val otherConditions = conditions.filterNot { cond => - cond.references.subsetOf(input.outputSet) - } - if (joins.isEmpty) { - input - } else if (otherConditions.isEmpty) { - // no condition for these joins, so put them in original order - (Seq(input) ++ joins).reduceLeft(Join(_, _, Inner, None)) + def createOrderedJoin(input: Seq[LogicalPlan], conditions: Seq[Expression]): LogicalPlan = { + assert(input.size >= 2) + if (input.size == 2) { + Join(input(0), input(1), Inner, conditions.reduceLeftOption(And)) } else { - // find out the first join that have at least one condition - val conditionalJoin = joins.find { plan => - val refs = input.outputSet ++ plan.outputSet - otherConditions.exists(cond => cond.references.subsetOf(refs)) + val left = input.head + val rest = input.drop(1) + // find out the first join that have at least one join condition + val conditionalJoin = rest.find { plan => + val refs = left.outputSet ++ plan.outputSet + conditions.filterNot(_.references.subsetOf(left.outputSet)) + .filterNot(_.references.subsetOf(plan.outputSet)) + .exists(cond => cond.references.subsetOf(refs)) } - assert(conditionalJoin.isDefined) - val picked = conditionalJoin.get - val joined = Join(input, picked, Inner, None) - reorder(joined, joins.filterNot(_ eq picked), otherConditions) + // pick the next one if no condition left + val right = conditionalJoin.getOrElse(rest.head) + + val joinedRefs = left.outputSet ++ right.outputSet + val (joinConditions, others) = conditions.partition(_.references.subsetOf(joinedRefs)) + val joined = Join(left, right, Inner, joinConditions.reduceLeftOption(And)) + + createOrderedJoin(Seq(joined) ++ rest.filterNot(_ eq right), others) } } def apply(plan: LogicalPlan): LogicalPlan = plan transform { // TODO: support outer join - case FilterAndInnerJoins(input, joins, filterConditions) if joins.size > 1 => - assert(filterConditions.nonEmpty) - val joined = reorder(input, joins, filterConditions) - Filter(filterConditions.reduceLeft(And), joined) + case j @ ExtractFiltersAndInnerJoins(input, conditions) if input.size > 2 => + assert(conditions.nonEmpty) + createOrderedJoin(input, conditions) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 932a6f525457..0af78075171c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -137,28 +137,34 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { * Filter * | * inner Join - * / \ ----> (filters, Seq(plan1, plan2), input) - * inner join plan2 + * / \ ----> (Seq(plan0, plan1, plan2), conditions) + * Filter plan2 + * | + * inner join * / \ - * input plan1 + * plan0 plan1 */ -object FilterAndInnerJoins extends PredicateHelper { - def unapply(plan: LogicalPlan): Option[(LogicalPlan, Seq[LogicalPlan], Seq[Expression])] = - plan match { - case f @ Filter(filterCondition, j @ Join(left, right, Inner, None)) => +object ExtractFiltersAndInnerJoins extends PredicateHelper { + + // flatten all inner joins, which are next to each other + def flattenJoin(plan: LogicalPlan): (Seq[LogicalPlan], Seq[Expression]) = plan match { + case Join(left, right, Inner, cond) => + // only find the nested join on left, because we can only generate the plan like that + val (plans, conditions) = flattenJoin(left) + (plans ++ Seq(right), conditions ++ cond.toSeq) - // flatten all inner joins, which are next to each other and has no condition - def flattenJoin(plan: LogicalPlan): (LogicalPlan, Seq[LogicalPlan]) = plan match { - case Join(left, right, Inner, None) => - val (input, joins) = flattenJoin(left) - (input, joins ++ Seq(right)) - case _ => (plan, Seq()) - } + case Filter(filterCondition, j @ Join(left, right, Inner, joinCondition)) => + val (plans, conditions) = flattenJoin(j) + (plans, conditions ++ splitConjunctivePredicates(filterCondition)) - val allConditions = splitConjunctivePredicates(filterCondition) - val (input, joins) = flattenJoin(j) - Some((input, joins, allConditions)) + case _ => (Seq(plan), Seq()) + } + def unapply(plan: LogicalPlan): Option[(Seq[LogicalPlan], Seq[Expression])] = plan match { + case f @ Filter(filterCondition, j @ Join(_, _, Inner, _)) => + Some(flattenJoin(f)) + case j @ Join(_, _, Inner, _) => + Some(flattenJoin(j)) case _ => None } } From 70508f0bcbc8fef53485c28c6c8b1242aa046b3b Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 1 Dec 2015 15:16:29 -0800 Subject: [PATCH 05/10] use canEvaluate --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index aa154ad30900..30ea07f527c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -733,9 +733,8 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { // find out the first join that have at least one join condition val conditionalJoin = rest.find { plan => val refs = left.outputSet ++ plan.outputSet - conditions.filterNot(_.references.subsetOf(left.outputSet)) - .filterNot(_.references.subsetOf(plan.outputSet)) - .exists(cond => cond.references.subsetOf(refs)) + conditions.filterNot(canEvaluate(_, left)).filterNot(canEvaluate(_, plan)) + .exists(_.references.subsetOf(refs)) } // pick the next one if no condition left val right = conditionalJoin.getOrElse(rest.head) From 21a81b5ad4a616300fca8d55b86659c0a6634630 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 1 Dec 2015 15:35:28 -0800 Subject: [PATCH 06/10] address comments --- .../optimizer/FilterPushdownSuite.scala | 20 ------ .../catalyst/optimizer/JoinOrderSuite.scala | 69 +++++++++++++++++++ 2 files changed, 69 insertions(+), 20 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 2781f26454d5..d981a53bb220 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -38,7 +38,6 @@ class FilterPushdownSuite extends PlanTest { CombineFilters, PushPredicateThroughProject, BooleanSimplification, - ReorderJoin, PushPredicateThroughJoin, PushPredicateThroughGenerate, PushPredicateThroughAggregate, @@ -549,25 +548,6 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) } - test("joins: reorder inner joins") { - val x = testRelation.subquery('x) - val y = testRelation1.subquery('y) - val z = testRelation.subquery('z) - - val originalQuery = { - x.join(y).join(z) - .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = - x.join(z, condition = Some("x.b".attr === "z.b".attr)) - .join(y, condition = Some("y.d".attr === "z.a".attr)) - .analyze - - comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) - } - val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType)) test("generate: predicate referenced no generated column") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala new file mode 100644 index 000000000000..8336dc7f4c6a --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor + + +class JoinOrderSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Subqueries", Once, + EliminateSubQueries) :: + Batch("Filter Pushdown", Once, + CombineFilters, + PushPredicateThroughProject, + BooleanSimplification, + ReorderJoin, + PushPredicateThroughJoin, + PushPredicateThroughGenerate, + PushPredicateThroughAggregate, + ColumnPruning, + ProjectCollapsing) :: Nil + + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + val testRelation1 = LocalRelation('d.int) + + test("joins: reorder inner joins") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + val z = testRelation.subquery('z) + + val originalQuery = { + x.join(y).join(z) + .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + x.join(z, condition = Some("x.b".attr === "z.b".attr)) + .join(y, condition = Some("y.d".attr === "z.a".attr)) + .analyze + + comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) + } +} From 5469caf0c81a4e94e49032ae00b192224f1c256b Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 1 Dec 2015 16:08:04 -0800 Subject: [PATCH 07/10] add more tests --- .../sql/catalyst/optimizer/Optimizer.scala | 4 +-- .../catalyst/optimizer/JoinOrderSuite.scala | 28 ++++++++++++++++++- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 30ea07f527c9..9f3896b5b760 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -728,8 +728,7 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { if (input.size == 2) { Join(input(0), input(1), Inner, conditions.reduceLeftOption(And)) } else { - val left = input.head - val rest = input.drop(1) + val left :: rest = input.toList // find out the first join that have at least one join condition val conditionalJoin = rest.find { plan => val refs = left.outputSet ++ plan.outputSet @@ -743,6 +742,7 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { val (joinConditions, others) = conditions.partition(_.references.subsetOf(joinedRefs)) val joined = Join(left, right, Inner, joinConditions.reduceLeftOption(And)) + // should not have reference to same logical plan createOrderedJoin(Seq(joined) ++ rest.filterNot(_ eq right), others) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala index 8336dc7f4c6a..08b0c4d9cf25 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala @@ -21,6 +21,8 @@ import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -48,7 +50,31 @@ class JoinOrderSuite extends PlanTest { val testRelation = LocalRelation('a.int, 'b.int, 'c.int) val testRelation1 = LocalRelation('d.int) - test("joins: reorder inner joins") { + test("extract filters and joins") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + val z = testRelation.subquery('z) + + def testExtract(plan: LogicalPlan, expected: Option[(Seq[LogicalPlan], Seq[Expression])]) { + assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected) + } + + testExtract(x, None) + testExtract(x.where("x.b".attr === 1), None) + testExtract(x.join(y), Some(Seq(x, y), Seq())) + testExtract(x.join(y, condition = Some("x.b".attr === "y.d".attr)), + Some(Seq(x, y), Seq("x.b".attr === "y.d".attr))) + testExtract(x.join(y).where("x.b".attr === "y.d".attr), + Some(Seq(x, y), Seq("x.b".attr === "y.d".attr))) + testExtract(x.join(y).join(z), Some(Seq(x, y, z), Seq())) + testExtract(x.join(y).where("x.b".attr === "y.d".attr).join(z), + Some(Seq(x, y, z), Seq("x.b".attr === "y.d".attr))) + testExtract(x.join(y).join(x.join(z)), Some(Seq(x, y, x.join(z)), Seq())) + testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr), + Some(Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr))) + } + + test("reorder inner joins") { val x = testRelation.subquery('x) val y = testRelation1.subquery('y) val z = testRelation.subquery('z) From 556a3828e4bc2f7c6396b412f2e7105d7f62f16f Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 2 Dec 2015 09:57:30 -0800 Subject: [PATCH 08/10] fix empty condition --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9f3896b5b760..5a389755cf37 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -749,8 +749,8 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { // TODO: support outer join - case j @ ExtractFiltersAndInnerJoins(input, conditions) if input.size > 2 => - assert(conditions.nonEmpty) + case j @ ExtractFiltersAndInnerJoins(input, conditions) + if input.size > 2 && conditions.nonEmpty => createOrderedJoin(input, conditions) } } From eb31c37ef3411a73899be7005b6115ebfa8c3571 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 2 Dec 2015 16:58:51 -0800 Subject: [PATCH 09/10] update comments --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 5 ++++- .../org/apache/spark/sql/catalyst/planning/patterns.scala | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 5a389755cf37..517954c8ad31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -711,7 +711,10 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHel } /** - * Reorder the joins so that the bottom ones have at least one condition. + * Reorder the joins and push all the conditions into join, so that the bottom ones have at least + * one condition. + * + * The order of joins will not be changed if all of them already have at least one condition. */ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 0af78075171c..cd3f15cbe107 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -143,13 +143,14 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { * inner join * / \ * plan0 plan1 + * + * Note: This pattern currently only works for left-deep trees. */ object ExtractFiltersAndInnerJoins extends PredicateHelper { // flatten all inner joins, which are next to each other def flattenJoin(plan: LogicalPlan): (Seq[LogicalPlan], Seq[Expression]) = plan match { case Join(left, right, Inner, cond) => - // only find the nested join on left, because we can only generate the plan like that val (plans, conditions) = flattenJoin(left) (plans ++ Seq(right), conditions ++ cond.toSeq) From ddffd8f1631b0949b0a22f9e64e64975ed77aed7 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 7 Dec 2015 10:33:19 -0800 Subject: [PATCH 10/10] fix style --- .../sql/catalyst/optimizer/Optimizer.scala | 1 - .../optimizer/FilterPushdownSuite.scala | 1 - .../catalyst/optimizer/JoinOrderSuite.scala | 20 +++++++++---------- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 517954c8ad31..f6088695a927 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -751,7 +751,6 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } def apply(plan: LogicalPlan): LogicalPlan = plan transform { - // TODO: support outer join case j @ ExtractFiltersAndInnerJoins(input, conditions) if input.size > 2 && conditions.nonEmpty => createOrderedJoin(input, conditions) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index d981a53bb220..fba4c5ca77d6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -750,5 +750,4 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala index 08b0c4d9cf25..9b1e16c72764 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala @@ -34,16 +34,16 @@ class JoinOrderSuite extends PlanTest { val batches = Batch("Subqueries", Once, EliminateSubQueries) :: - Batch("Filter Pushdown", Once, - CombineFilters, - PushPredicateThroughProject, - BooleanSimplification, - ReorderJoin, - PushPredicateThroughJoin, - PushPredicateThroughGenerate, - PushPredicateThroughAggregate, - ColumnPruning, - ProjectCollapsing) :: Nil + Batch("Filter Pushdown", Once, + CombineFilters, + PushPredicateThroughProject, + BooleanSimplification, + ReorderJoin, + PushPredicateThroughJoin, + PushPredicateThroughGenerate, + PushPredicateThroughAggregate, + ColumnPruning, + ProjectCollapsing) :: Nil }