From b47dce734fa826df4b9124b5a938a9bbabfdbabd Mon Sep 17 00:00:00 2001 From: Angers Date: Wed, 26 Jun 2019 17:08:48 +0800 Subject: [PATCH 01/12] [SPARK-28169] extract predicate expression deeply --- .../spark/sql/hive/HiveStrategies.scala | 62 ++++++++++++++++++- 1 file changed, 59 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 8a5ab188a949..15d04bc3b28f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -237,21 +237,77 @@ private[hive] trait HiveStrategies { * applied. */ object HiveTableScans extends Strategy { + + def constructBinaryOperators(left:Expression, right: Expression, op_type: String): Expression ={ + (left == null, right == null) match { + case (true, true) => null + case (true, false) => right + case (false, true) => left + case (false, false) => + if(op_type == "or") + Or(left, right) + else if (op_type == "and") + And(left, right) + else + null + } + } + + def resolveAndExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = { + if(expr.isInstanceOf[And]){ + val and = expr.asInstanceOf[And] + constructBinaryOperators(resolvePredicatesExpression(and.left, partitionKeyIds), resolvePredicatesExpression(and.right, partitionKeyIds), "and") + }else{ + resolvePredicatesExpression(expr, partitionKeyIds) + } + } + + def resolveOrExpression(or: Or, partitionKeyIds: AttributeSet): Expression = { + (or.left.isInstanceOf[Or],or.right.isInstanceOf[Or]) match { + case (true, true) => constructBinaryOperators(resolveOrExpression(or.left.asInstanceOf[Or], partitionKeyIds) , resolveOrExpression(or.right.asInstanceOf[Or], partitionKeyIds), "or") + case (true, false) => constructBinaryOperators(resolveOrExpression(or.left.asInstanceOf[Or], partitionKeyIds) , resolveAndExpression(or.right, partitionKeyIds), "or") + case (false, true) => constructBinaryOperators(resolveAndExpression(or.left, partitionKeyIds) , resolveOrExpression(or.right.asInstanceOf[Or], partitionKeyIds), "or") + case (false, false) => constructBinaryOperators(resolveAndExpression(or.left, partitionKeyIds) , resolveAndExpression(or.right, partitionKeyIds), "or") + } + } + + def resolvePredicatesExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression ={ + if(!expr.references.isEmpty && expr.references.subsetOf(partitionKeyIds)) + expr + else + null + } + + def extractPushDownPredicate(predicates: Seq[Expression], partitionKeyIds: AttributeSet): Seq[Expression] ={ + predicates.map(predicate => { + if(predicate.isInstanceOf[Or]){ + val or = predicate.asInstanceOf[Or] + resolveOrExpression(or, partitionKeyIds) + }else{ + resolvePredicatesExpression(predicate,partitionKeyIds) + } + }) + } + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projectList, predicates, relation: HiveTableRelation) => // Filter out all predicates that only deal with partition keys, these are given to the // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionCols) - val (pruningPredicates, otherPredicates) = predicates.partition { predicate => + val (_, otherPredicates) = predicates.partition { predicate => { !predicate.references.isEmpty && - predicate.references.subsetOf(partitionKeyIds) + predicate.references.subsetOf(partitionKeyIds) } + } + + val extractedPruningPredicates = extractPushDownPredicate(predicates, partitionKeyIds) + .filter(_ != null) pruneFilterProject( projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil + HiveTableScanExec(_, relation, extractedPruningPredicates)(sparkSession)) :: Nil case _ => Nil } From 5bc19d4906fbe6cca470285561c7420f7f9260e3 Mon Sep 17 00:00:00 2001 From: Angers Date: Wed, 26 Jun 2019 21:59:43 +0800 Subject: [PATCH 02/12] resolve and deeply --- .../spark/sql/hive/HiveStrategies.scala | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 15d04bc3b28f..773ef6842686 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -254,11 +254,11 @@ private[hive] trait HiveStrategies { } def resolveAndExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = { - if(expr.isInstanceOf[And]){ - val and = expr.asInstanceOf[And] - constructBinaryOperators(resolvePredicatesExpression(and.left, partitionKeyIds), resolvePredicatesExpression(and.right, partitionKeyIds), "and") - }else{ - resolvePredicatesExpression(expr, partitionKeyIds) + expr match { + case and: And => + constructBinaryOperators(resolveAndExpression(and.left, partitionKeyIds), resolveAndExpression(and.right, partitionKeyIds), "and") + case _ => + resolvePredicatesExpression(expr, partitionKeyIds) } } @@ -279,14 +279,12 @@ private[hive] trait HiveStrategies { } def extractPushDownPredicate(predicates: Seq[Expression], partitionKeyIds: AttributeSet): Seq[Expression] ={ - predicates.map(predicate => { - if(predicate.isInstanceOf[Or]){ - val or = predicate.asInstanceOf[Or] + predicates.map { + case or: Or => resolveOrExpression(or, partitionKeyIds) - }else{ - resolvePredicatesExpression(predicate,partitionKeyIds) - } - }) + case predicate => + resolvePredicatesExpression(predicate, partitionKeyIds) + } } def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { From 4705fc217bf93f6f15b3bd009dc650fbd8df4105 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=B7?= Date: Fri, 28 Jun 2019 14:14:59 +0800 Subject: [PATCH 03/12] fit scala style --- .../spark/sql/hive/HiveStrategies.scala | 37 ++++++++++++++----- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 773ef6842686..a74933b27832 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -256,29 +256,46 @@ private[hive] trait HiveStrategies { def resolveAndExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = { expr match { case and: And => - constructBinaryOperators(resolveAndExpression(and.left, partitionKeyIds), resolveAndExpression(and.right, partitionKeyIds), "and") + constructBinaryOperators( + resolveAndExpression(and.left, partitionKeyIds), + resolveAndExpression(and.right, partitionKeyIds), + "and") case _ => resolvePredicatesExpression(expr, partitionKeyIds) } } def resolveOrExpression(or: Or, partitionKeyIds: AttributeSet): Expression = { - (or.left.isInstanceOf[Or],or.right.isInstanceOf[Or]) match { - case (true, true) => constructBinaryOperators(resolveOrExpression(or.left.asInstanceOf[Or], partitionKeyIds) , resolveOrExpression(or.right.asInstanceOf[Or], partitionKeyIds), "or") - case (true, false) => constructBinaryOperators(resolveOrExpression(or.left.asInstanceOf[Or], partitionKeyIds) , resolveAndExpression(or.right, partitionKeyIds), "or") - case (false, true) => constructBinaryOperators(resolveAndExpression(or.left, partitionKeyIds) , resolveOrExpression(or.right.asInstanceOf[Or], partitionKeyIds), "or") - case (false, false) => constructBinaryOperators(resolveAndExpression(or.left, partitionKeyIds) , resolveAndExpression(or.right, partitionKeyIds), "or") + (or.left.isInstanceOf[Or], or.right.isInstanceOf[Or]) match { + case (true, true) => constructBinaryOperators( + resolveOrExpression(or.left.asInstanceOf[Or], partitionKeyIds), + resolveOrExpression(or.right.asInstanceOf[Or], partitionKeyIds), + "or") + case (true, false) => constructBinaryOperators( + resolveOrExpression(or.left.asInstanceOf[Or], partitionKeyIds) , + resolveAndExpression(or.right, partitionKeyIds), + "or") + case (false, true) => constructBinaryOperators( + resolveAndExpression(or.left, partitionKeyIds), + resolveOrExpression(or.right.asInstanceOf[Or], + partitionKeyIds), "or") + case (false, false) => constructBinaryOperators( + resolveAndExpression(or.left, partitionKeyIds), + resolveAndExpression(or.right, partitionKeyIds), + "or") } } - def resolvePredicatesExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression ={ - if(!expr.references.isEmpty && expr.references.subsetOf(partitionKeyIds)) + def resolvePredicatesExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = { + if (!expr.references.isEmpty && expr.references.subsetOf(partitionKeyIds)) { expr - else + } else { null + } } - def extractPushDownPredicate(predicates: Seq[Expression], partitionKeyIds: AttributeSet): Seq[Expression] ={ + def extractPushDownPredicate(predicates: Seq[Expression], + partitionKeyIds: AttributeSet): Seq[Expression] = { predicates.map { case or: Or => resolveOrExpression(or, partitionKeyIds) From b5e00a5ac64f5892da516fabed29d0b5ed3e2875 Mon Sep 17 00:00:00 2001 From: Angers Date: Fri, 28 Jun 2019 16:13:51 +0800 Subject: [PATCH 04/12] Fix problem of eslaped Or condition Fix situation of A is partition key SELECT * FROM A WHERE A=1 OR B = 2 Int this case, we should ignore this condition --- .../spark/sql/hive/HiveStrategies.scala | 90 +++++++------------ 1 file changed, 33 insertions(+), 57 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index a74933b27832..c0ca3177c4df 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -238,54 +238,6 @@ private[hive] trait HiveStrategies { */ object HiveTableScans extends Strategy { - def constructBinaryOperators(left:Expression, right: Expression, op_type: String): Expression ={ - (left == null, right == null) match { - case (true, true) => null - case (true, false) => right - case (false, true) => left - case (false, false) => - if(op_type == "or") - Or(left, right) - else if (op_type == "and") - And(left, right) - else - null - } - } - - def resolveAndExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = { - expr match { - case and: And => - constructBinaryOperators( - resolveAndExpression(and.left, partitionKeyIds), - resolveAndExpression(and.right, partitionKeyIds), - "and") - case _ => - resolvePredicatesExpression(expr, partitionKeyIds) - } - } - - def resolveOrExpression(or: Or, partitionKeyIds: AttributeSet): Expression = { - (or.left.isInstanceOf[Or], or.right.isInstanceOf[Or]) match { - case (true, true) => constructBinaryOperators( - resolveOrExpression(or.left.asInstanceOf[Or], partitionKeyIds), - resolveOrExpression(or.right.asInstanceOf[Or], partitionKeyIds), - "or") - case (true, false) => constructBinaryOperators( - resolveOrExpression(or.left.asInstanceOf[Or], partitionKeyIds) , - resolveAndExpression(or.right, partitionKeyIds), - "or") - case (false, true) => constructBinaryOperators( - resolveAndExpression(or.left, partitionKeyIds), - resolveOrExpression(or.right.asInstanceOf[Or], - partitionKeyIds), "or") - case (false, false) => constructBinaryOperators( - resolveAndExpression(or.left, partitionKeyIds), - resolveAndExpression(or.right, partitionKeyIds), - "or") - } - } - def resolvePredicatesExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = { if (!expr.references.isEmpty && expr.references.subsetOf(partitionKeyIds)) { expr @@ -294,13 +246,36 @@ private[hive] trait HiveStrategies { } } - def extractPushDownPredicate(predicates: Seq[Expression], - partitionKeyIds: AttributeSet): Seq[Expression] = { - predicates.map { - case or: Or => - resolveOrExpression(or, partitionKeyIds) - case predicate => - resolvePredicatesExpression(predicate, partitionKeyIds) + def constructBinaryOperators(left: Expression, right: Expression, op_type: String): Expression = { + op_type.toUpperCase match { + case "OR" if left != null && right != null => Or(left, right) + case "AND" if left != null || right != null => { + if (left == null) { + right + } else if (right == null) { + left + } else { + And(left, right) + } + } + case _ => null + } + } + + def resolveExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = { + expr match { + case And(left, right) => + constructBinaryOperators( + resolveExpression(left, partitionKeyIds), + resolveExpression(right, partitionKeyIds), + "and") + case or@Or(left, right) + if or.children.forall(_.references.exists(ref => partitionKeyIds.contains(ref))) => + constructBinaryOperators( + resolveExpression(left, partitionKeyIds), + resolveExpression(right, partitionKeyIds), + "or") + case _ => resolvePredicatesExpression(expr, partitionKeyIds) } } @@ -315,8 +290,9 @@ private[hive] trait HiveStrategies { } } - val extractedPruningPredicates = extractPushDownPredicate(predicates, partitionKeyIds) - .filter(_ != null) + val extractedPruningPredicates = + predicates.map(resolveExpression(_, partitionKeyIds)) + .filter(_ != null) pruneFilterProject( projectList, From e8a9b28e2d39665f5be8dd387131fe83a2105f83 Mon Sep 17 00:00:00 2001 From: Angers Date: Sat, 29 Jun 2019 16:07:16 +0800 Subject: [PATCH 05/12] Add ExtractPartitionPredicates --- .../sql/catalyst/planning/patterns.scala | 49 ++++++++++++++++++ .../datasources/FileSourceStrategy.scala | 4 +- .../spark/sql/hive/HiveStrategies.scala | 50 ++----------------- 3 files changed, 54 insertions(+), 49 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index a816922f49ae..af58cafa5ee4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -309,4 +309,53 @@ object PhysicalWindow { case _ => None } + + object ExtractPartitionPredicates extends Logging { + + private def resolvePredicatesExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = { + if (!expr.references.isEmpty && expr.references.subsetOf(partitionKeyIds)) { + expr + } else { + null + } + } + + private def constructBinaryOperators(left: Expression, right: Expression, op_type: String): Expression = { + op_type.toUpperCase match { + case "OR" if left != null && right != null => Or(left, right) + case "AND" if left != null || right != null => { + if (left == null) { + right + } else if (right == null) { + left + } else { + And(left, right) + } + } + case _ => null + } + } + + private def resolveExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = { + expr match { + case And(left, right) => + constructBinaryOperators( + resolveExpression(left, partitionKeyIds), + resolveExpression(right, partitionKeyIds), + "and") + case or@Or(left, right) + if or.children.forall(_.references.exists(ref => partitionKeyIds.contains(ref))) => + constructBinaryOperators( + resolveExpression(left, partitionKeyIds), + resolveExpression(right, partitionKeyIds), + "or") + case _ => resolvePredicatesExpression(expr, partitionKeyIds) + } + } + + def extractPartitionPredicate(predicates: Seq[Expression], partitionKeyIds:AttributeSet): Seq[Expression] = { + predicates.map(resolveExpression(_, partitionKeyIds)) + .filter(_ != null) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index c8a42f043f15..3d9f6f548c50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.planning.PhysicalWindow.ExtractPartitionPredicates import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.util.collection.BitSet @@ -154,8 +155,7 @@ object FileSourceStrategy extends Strategy with Logging { fsRelation.partitionSchema, fsRelation.sparkSession.sessionState.analyzer.resolver) val partitionSet = AttributeSet(partitionColumns) val partitionKeyFilters = - ExpressionSet(normalizedFilters - .filter(_.references.subsetOf(partitionSet))) + ExpressionSet(ExtractPartitionPredicates.extractPartitionPredicate(normalizedFilters, partitionSet)) logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index c0ca3177c4df..62cf9944fc22 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -21,13 +21,12 @@ import java.io.IOException import java.util.Locale import org.apache.hadoop.fs.{FileSystem, Path} - import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalWindow.ExtractPartitionPredicates import org.apache.spark.sql.catalyst.planning._ -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, - ScriptTransformation} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, ScriptTransformation} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} @@ -238,47 +237,6 @@ private[hive] trait HiveStrategies { */ object HiveTableScans extends Strategy { - def resolvePredicatesExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = { - if (!expr.references.isEmpty && expr.references.subsetOf(partitionKeyIds)) { - expr - } else { - null - } - } - - def constructBinaryOperators(left: Expression, right: Expression, op_type: String): Expression = { - op_type.toUpperCase match { - case "OR" if left != null && right != null => Or(left, right) - case "AND" if left != null || right != null => { - if (left == null) { - right - } else if (right == null) { - left - } else { - And(left, right) - } - } - case _ => null - } - } - - def resolveExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = { - expr match { - case And(left, right) => - constructBinaryOperators( - resolveExpression(left, partitionKeyIds), - resolveExpression(right, partitionKeyIds), - "and") - case or@Or(left, right) - if or.children.forall(_.references.exists(ref => partitionKeyIds.contains(ref))) => - constructBinaryOperators( - resolveExpression(left, partitionKeyIds), - resolveExpression(right, partitionKeyIds), - "or") - case _ => resolvePredicatesExpression(expr, partitionKeyIds) - } - } - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projectList, predicates, relation: HiveTableRelation) => // Filter out all predicates that only deal with partition keys, these are given to the @@ -290,9 +248,7 @@ private[hive] trait HiveStrategies { } } - val extractedPruningPredicates = - predicates.map(resolveExpression(_, partitionKeyIds)) - .filter(_ != null) + val extractedPruningPredicates = ExtractPartitionPredicates.extractPartitionPredicate(predicates, partitionKeyIds) pruneFilterProject( projectList, From 55323cefdd74378e3914538f8e317ebcb0201acb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=B7?= Date: Sat, 29 Jun 2019 16:15:38 +0800 Subject: [PATCH 06/12] Fix scala stype --- .../sql/catalyst/planning/patterns.scala | 226 +++++++++--------- .../datasources/FileSourceStrategy.scala | 6 +- .../spark/sql/hive/HiveStrategies.scala | 5 +- 3 files changed, 121 insertions(+), 116 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index af58cafa5ee4..92cdb25edf5e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -17,8 +17,9 @@ package org.apache.spark.sql.catalyst.planning -import scala.collection.mutable +import java.util.Locale +import scala.collection.mutable import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ @@ -27,12 +28,12 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ /** - * A pattern that matches any number of project or filter operations on top of another relational - * operator. All filter operators are collected and their conditions are broken up and returned - * together with the top project operator. - * [[org.apache.spark.sql.catalyst.expressions.Alias Aliases]] are in-lined/substituted if - * necessary. - */ + * A pattern that matches any number of project or filter operations on top of another relational + * operator. All filter operators are collected and their conditions are broken up and returned + * together with the top project operator. + * [[org.apache.spark.sql.catalyst.expressions.Alias Aliases]] are in-lined/substituted if + * necessary. + */ object PhysicalOperation extends PredicateHelper { type ReturnType = (Seq[NamedExpression], Seq[Expression], LogicalPlan) @@ -42,21 +43,21 @@ object PhysicalOperation extends PredicateHelper { } /** - * Collects all deterministic projects and filters, in-lining/substituting aliases if necessary. - * Here are two examples for alias in-lining/substitution. - * Before: - * {{{ - * SELECT c1 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10 - * SELECT c1 AS c2 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10 - * }}} - * After: - * {{{ - * SELECT key AS c1 FROM t1 WHERE key > 10 - * SELECT key AS c2 FROM t1 WHERE key > 10 - * }}} - */ + * Collects all deterministic projects and filters, in-lining/substituting aliases if necessary. + * Here are two examples for alias in-lining/substitution. + * Before: + * {{{ + * SELECT c1 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10 + * SELECT c1 AS c2 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10 + * }}} + * After: + * {{{ + * SELECT key AS c1 FROM t1 WHERE key > 10 + * SELECT key AS c2 FROM t1 WHERE key > 10 + * }}} + */ private def collectProjectsAndFilters(plan: LogicalPlan): - (Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression]) = + (Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression]) = plan match { case Project(fields, child) if fields.forall(_.deterministic) => val (_, filters, other, aliases) = collectProjectsAndFilters(child) @@ -76,12 +77,12 @@ object PhysicalOperation extends PredicateHelper { } private def collectAliases(fields: Seq[Expression]): Map[Attribute, Expression] = fields.collect { - case a @ Alias(child, _) => a.toAttribute -> child + case a@Alias(child, _) => a.toAttribute -> child }.toMap private def substitute(aliases: Map[Attribute, Expression])(expr: Expression): Expression = { expr.transform { - case a @ Alias(ref: AttributeReference, name) => + case a@Alias(ref: AttributeReference, name) => aliases.get(ref) .map(Alias(_, name)(a.exprId, a.qualifier)) .getOrElse(a) @@ -94,11 +95,11 @@ object PhysicalOperation extends PredicateHelper { } /** - * A pattern that finds joins with equality conditions that can be evaluated using equi-join. - * - * Null-safe equality will be transformed into equality as joining key (replace null with default - * value). - */ + * A pattern that finds joins with equality conditions that can be evaluated using equi-join. + * + * Null-safe equality will be transformed into equality as joining key (replace null with default + * value). + */ object ExtractEquiJoinKeys extends Logging with PredicateHelper { /** (joinType, leftKeys, rightKeys, condition, leftChild, rightChild, joinHint) */ type ReturnType = @@ -144,35 +145,35 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { } /** - * A pattern that collects the filter and inner joins. - * - * Filter - * | - * inner Join - * / \ ----> (Seq(plan0, plan1, plan2), conditions) - * Filter plan2 - * | - * inner join - * / \ - * plan0 plan1 - * - * Note: This pattern currently only works for left-deep trees. - */ + * A pattern that collects the filter and inner joins. + * + * Filter + * | + * inner Join + * / \ ----> (Seq(plan0, plan1, plan2), conditions) + * Filter plan2 + * | + * inner join + * / \ + * plan0 plan1 + * + * Note: This pattern currently only works for left-deep trees. + */ object ExtractFiltersAndInnerJoins extends PredicateHelper { /** - * Flatten all inner joins, which are next to each other. - * Return a list of logical plans to be joined with a boolean for each plan indicating if it - * was involved in an explicit cross join. Also returns the entire list of join conditions for - * the left-deep tree. - */ + * Flatten all inner joins, which are next to each other. + * Return a list of logical plans to be joined with a boolean for each plan indicating if it + * was involved in an explicit cross join. Also returns the entire list of join conditions for + * the left-deep tree. + */ def flattenJoin(plan: LogicalPlan, parentJoinType: InnerLike = Inner) - : (Seq[(LogicalPlan, InnerLike)], Seq[Expression]) = plan match { + : (Seq[(LogicalPlan, InnerLike)], Seq[Expression]) = plan match { case Join(left, right, joinType: InnerLike, cond, hint) if hint == JoinHint.NONE => val (plans, conditions) = flattenJoin(left, joinType) (plans ++ Seq((right, joinType)), conditions ++ cond.toSeq.flatMap(splitConjunctivePredicates)) - case Filter(filterCondition, j @ Join(_, _, _: InnerLike, _, hint)) if hint == JoinHint.NONE => + case Filter(filterCondition, j@Join(_, _, _: InnerLike, _, hint)) if hint == JoinHint.NONE => val (plans, conditions) = flattenJoin(j) (plans, conditions ++ splitConjunctivePredicates(filterCondition)) @@ -180,27 +181,27 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { } def unapply(plan: LogicalPlan) - : Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] - = plan match { - case f @ Filter(filterCondition, j @ Join(_, _, joinType: InnerLike, _, hint)) - if hint == JoinHint.NONE => + : Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] + = plan match { + case f@Filter(filterCondition, j@Join(_, _, joinType: InnerLike, _, hint)) + if hint == JoinHint.NONE => Some(flattenJoin(f)) - case j @ Join(_, _, joinType, _, hint) if hint == JoinHint.NONE => + case j@Join(_, _, joinType, _, hint) if hint == JoinHint.NONE => Some(flattenJoin(j)) case _ => None } } /** - * An extractor used when planning the physical execution of an aggregation. Compared with a logical - * aggregation, the following transformations are performed: - * - Unnamed grouping expressions are named so that they can be referred to across phases of - * aggregation - * - Aggregations that appear multiple times are deduplicated. - * - The computation of the aggregations themselves is separated from the final result. For - * example, the `count` in `count + 1` will be split into an [[AggregateExpression]] and a final - * computation that computes `count.resultAttribute + 1`. - */ + * An extractor used when planning the physical execution of an aggregation. Compared with a logical + * aggregation, the following transformations are performed: + * - Unnamed grouping expressions are named so that they can be referred to across phases of + * aggregation + * - Aggregations that appear multiple times are deduplicated. + * - The computation of the aggregations themselves is separated from the final result. For + * example, the `count` in `count + 1` will be split into an [[AggregateExpression]] and a final + * computation that computes `count.resultAttribute + 1`. + */ object PhysicalAggregation { // groupingExpressions, aggregateExpressions, resultExpressions, child type ReturnType = @@ -249,7 +250,7 @@ object PhysicalAggregation { // so replace each aggregate expression by its corresponding attribute in the set: equivalentAggregateExpressions.getEquivalentExprs(ae).headOption .getOrElse(ae).asInstanceOf[AggregateExpression].resultAttribute - // Similar to AggregateExpression + // Similar to AggregateExpression case ue: PythonUDF if PythonUDF.isGroupedAggPandasUDF(ue) => equivalentAggregateExpressions.getEquivalentExprs(ue).headOption .getOrElse(ue).asInstanceOf[PythonUDF].resultAttribute @@ -275,19 +276,19 @@ object PhysicalAggregation { } /** - * An extractor used when planning physical execution of a window. This extractor outputs - * the window function type of the logical window. - * - * The input logical window must contain same type of window functions, which is ensured by - * the rule ExtractWindowExpressions in the analyzer. - */ + * An extractor used when planning physical execution of a window. This extractor outputs + * the window function type of the logical window. + * + * The input logical window must contain same type of window functions, which is ensured by + * the rule ExtractWindowExpressions in the analyzer. + */ object PhysicalWindow { // windowFunctionType, windowExpression, partitionSpec, orderSpec, child private type ReturnType = (WindowFunctionType, Seq[NamedExpression], Seq[Expression], Seq[SortOrder], LogicalPlan) def unapply(a: Any): Option[ReturnType] = a match { - case expr @ logical.Window(windowExpressions, partitionSpec, orderSpec, child) => + case expr@logical.Window(windowExpressions, partitionSpec, orderSpec, child) => // The window expression should not be empty here, otherwise it's a bug. if (windowExpressions.isEmpty) { @@ -309,53 +310,56 @@ object PhysicalWindow { case _ => None } +} - object ExtractPartitionPredicates extends Logging { +object ExtractPartitionPredicates extends Logging { - private def resolvePredicatesExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = { - if (!expr.references.isEmpty && expr.references.subsetOf(partitionKeyIds)) { - expr - } else { - null - } + private def resolvePredicatesExpression(expr: Expression, + partitionKeyIds: AttributeSet): Expression = { + if (!expr.references.isEmpty && expr.references.subsetOf(partitionKeyIds)) { + expr + } else { + null } + } - private def constructBinaryOperators(left: Expression, right: Expression, op_type: String): Expression = { - op_type.toUpperCase match { - case "OR" if left != null && right != null => Or(left, right) - case "AND" if left != null || right != null => { - if (left == null) { - right - } else if (right == null) { - left - } else { - And(left, right) - } + private def constructBinaryOperators(left: Expression, + right: Expression, + op_type: String): Expression = { + op_type.toUpperCase(Locale.ROOT) match { + case "OR" if left != null && right != null => Or(left, right) + case "AND" if left != null || right != null => + if (left == null) { + right + } else if (right == null) { + left + } else { + And(left, right) } - case _ => null - } + case _ => null } + } - private def resolveExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = { - expr match { - case And(left, right) => - constructBinaryOperators( - resolveExpression(left, partitionKeyIds), - resolveExpression(right, partitionKeyIds), - "and") - case or@Or(left, right) - if or.children.forall(_.references.exists(ref => partitionKeyIds.contains(ref))) => - constructBinaryOperators( - resolveExpression(left, partitionKeyIds), - resolveExpression(right, partitionKeyIds), - "or") - case _ => resolvePredicatesExpression(expr, partitionKeyIds) - } + private def resolveExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = { + expr match { + case And(left, right) => + constructBinaryOperators( + resolveExpression(left, partitionKeyIds), + resolveExpression(right, partitionKeyIds), + "and") + case or@Or(left, right) + if or.children.forall(_.references.exists(ref => partitionKeyIds.contains(ref))) => + constructBinaryOperators( + resolveExpression(left, partitionKeyIds), + resolveExpression(right, partitionKeyIds), + "or") + case _ => resolvePredicatesExpression(expr, partitionKeyIds) } + } - def extractPartitionPredicate(predicates: Seq[Expression], partitionKeyIds:AttributeSet): Seq[Expression] = { - predicates.map(resolveExpression(_, partitionKeyIds)) - .filter(_ != null) - } + def extractPartitionPredicate(predicates: Seq[Expression], + partitionKeyIds: AttributeSet): Seq[Expression] = { + predicates.map(resolveExpression(_, partitionKeyIds)) + .filter(_ != null) } -} +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 3d9f6f548c50..f4102a82d23f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -22,8 +22,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.planning.PhysicalWindow.ExtractPartitionPredicates +import org.apache.spark.sql.catalyst.planning.{ExtractPartitionPredicates, PhysicalOperation} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.util.collection.BitSet @@ -155,7 +154,8 @@ object FileSourceStrategy extends Strategy with Logging { fsRelation.partitionSchema, fsRelation.sparkSession.sessionState.analyzer.resolver) val partitionSet = AttributeSet(partitionColumns) val partitionKeyFilters = - ExpressionSet(ExtractPartitionPredicates.extractPartitionPredicate(normalizedFilters, partitionSet)) + ExpressionSet(ExtractPartitionPredicates + .extractPartitionPredicate(normalizedFilters, partitionSet)) logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 62cf9944fc22..838d5fc90e8c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -21,10 +21,10 @@ import java.io.IOException import java.util.Locale import org.apache.hadoop.fs.{FileSystem, Path} + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.planning.PhysicalWindow.ExtractPartitionPredicates import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, ScriptTransformation} import org.apache.spark.sql.catalyst.rules.Rule @@ -248,7 +248,8 @@ private[hive] trait HiveStrategies { } } - val extractedPruningPredicates = ExtractPartitionPredicates.extractPartitionPredicate(predicates, partitionKeyIds) + val extractedPruningPredicates = + ExtractPartitionPredicates.extractPartitionPredicate(predicates, partitionKeyIds) pruneFilterProject( projectList, From 3e8085a211c5286858d682a6c8d2dabbaa0849ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=B7?= Date: Sat, 29 Jun 2019 16:20:13 +0800 Subject: [PATCH 07/12] fix scalastyle --- .../sql/catalyst/planning/patterns.scala | 141 +++++++++--------- 1 file changed, 70 insertions(+), 71 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 92cdb25edf5e..1c2e0b17aef8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.planning import java.util.Locale -import scala.collection.mutable import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ @@ -28,12 +27,12 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ /** - * A pattern that matches any number of project or filter operations on top of another relational - * operator. All filter operators are collected and their conditions are broken up and returned - * together with the top project operator. - * [[org.apache.spark.sql.catalyst.expressions.Alias Aliases]] are in-lined/substituted if - * necessary. - */ + * A pattern that matches any number of project or filter operations on top of another relational + * operator. All filter operators are collected and their conditions are broken up and returned + * together with the top project operator. + * [[org.apache.spark.sql.catalyst.expressions.Alias Aliases]] are in-lined/substituted if + * necessary. + */ object PhysicalOperation extends PredicateHelper { type ReturnType = (Seq[NamedExpression], Seq[Expression], LogicalPlan) @@ -43,21 +42,21 @@ object PhysicalOperation extends PredicateHelper { } /** - * Collects all deterministic projects and filters, in-lining/substituting aliases if necessary. - * Here are two examples for alias in-lining/substitution. - * Before: - * {{{ - * SELECT c1 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10 - * SELECT c1 AS c2 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10 - * }}} - * After: - * {{{ - * SELECT key AS c1 FROM t1 WHERE key > 10 - * SELECT key AS c2 FROM t1 WHERE key > 10 - * }}} - */ + * Collects all deterministic projects and filters, in-lining/substituting aliases if necessary. + * Here are two examples for alias in-lining/substitution. + * Before: + * {{{ + * SELECT c1 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10 + * SELECT c1 AS c2 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10 + * }}} + * After: + * {{{ + * SELECT key AS c1 FROM t1 WHERE key > 10 + * SELECT key AS c2 FROM t1 WHERE key > 10 + * }}} + */ private def collectProjectsAndFilters(plan: LogicalPlan): - (Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression]) = + (Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression]) = plan match { case Project(fields, child) if fields.forall(_.deterministic) => val (_, filters, other, aliases) = collectProjectsAndFilters(child) @@ -77,12 +76,12 @@ object PhysicalOperation extends PredicateHelper { } private def collectAliases(fields: Seq[Expression]): Map[Attribute, Expression] = fields.collect { - case a@Alias(child, _) => a.toAttribute -> child + case a @ Alias(child, _) => a.toAttribute -> child }.toMap private def substitute(aliases: Map[Attribute, Expression])(expr: Expression): Expression = { expr.transform { - case a@Alias(ref: AttributeReference, name) => + case a @ Alias(ref: AttributeReference, name) => aliases.get(ref) .map(Alias(_, name)(a.exprId, a.qualifier)) .getOrElse(a) @@ -95,11 +94,11 @@ object PhysicalOperation extends PredicateHelper { } /** - * A pattern that finds joins with equality conditions that can be evaluated using equi-join. - * - * Null-safe equality will be transformed into equality as joining key (replace null with default - * value). - */ + * A pattern that finds joins with equality conditions that can be evaluated using equi-join. + * + * Null-safe equality will be transformed into equality as joining key (replace null with default + * value). + */ object ExtractEquiJoinKeys extends Logging with PredicateHelper { /** (joinType, leftKeys, rightKeys, condition, leftChild, rightChild, joinHint) */ type ReturnType = @@ -145,35 +144,35 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { } /** - * A pattern that collects the filter and inner joins. - * - * Filter - * | - * inner Join - * / \ ----> (Seq(plan0, plan1, plan2), conditions) - * Filter plan2 - * | - * inner join - * / \ - * plan0 plan1 - * - * Note: This pattern currently only works for left-deep trees. - */ + * A pattern that collects the filter and inner joins. + * + * Filter + * | + * inner Join + * / \ ----> (Seq(plan0, plan1, plan2), conditions) + * Filter plan2 + * | + * inner join + * / \ + * plan0 plan1 + * + * Note: This pattern currently only works for left-deep trees. + */ object ExtractFiltersAndInnerJoins extends PredicateHelper { /** - * Flatten all inner joins, which are next to each other. - * Return a list of logical plans to be joined with a boolean for each plan indicating if it - * was involved in an explicit cross join. Also returns the entire list of join conditions for - * the left-deep tree. - */ + * Flatten all inner joins, which are next to each other. + * Return a list of logical plans to be joined with a boolean for each plan indicating if it + * was involved in an explicit cross join. Also returns the entire list of join conditions for + * the left-deep tree. + */ def flattenJoin(plan: LogicalPlan, parentJoinType: InnerLike = Inner) - : (Seq[(LogicalPlan, InnerLike)], Seq[Expression]) = plan match { + : (Seq[(LogicalPlan, InnerLike)], Seq[Expression]) = plan match { case Join(left, right, joinType: InnerLike, cond, hint) if hint == JoinHint.NONE => val (plans, conditions) = flattenJoin(left, joinType) (plans ++ Seq((right, joinType)), conditions ++ cond.toSeq.flatMap(splitConjunctivePredicates)) - case Filter(filterCondition, j@Join(_, _, _: InnerLike, _, hint)) if hint == JoinHint.NONE => + case Filter(filterCondition, j @ Join(_, _, _: InnerLike, _, hint)) if hint == JoinHint.NONE => val (plans, conditions) = flattenJoin(j) (plans, conditions ++ splitConjunctivePredicates(filterCondition)) @@ -181,27 +180,27 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { } def unapply(plan: LogicalPlan) - : Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] - = plan match { - case f@Filter(filterCondition, j@Join(_, _, joinType: InnerLike, _, hint)) - if hint == JoinHint.NONE => + : Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] + = plan match { + case f @ Filter(filterCondition, j @ Join(_, _, joinType: InnerLike, _, hint)) + if hint == JoinHint.NONE => Some(flattenJoin(f)) - case j@Join(_, _, joinType, _, hint) if hint == JoinHint.NONE => + case j @ Join(_, _, joinType, _, hint) if hint == JoinHint.NONE => Some(flattenJoin(j)) case _ => None } } /** - * An extractor used when planning the physical execution of an aggregation. Compared with a logical - * aggregation, the following transformations are performed: - * - Unnamed grouping expressions are named so that they can be referred to across phases of - * aggregation - * - Aggregations that appear multiple times are deduplicated. - * - The computation of the aggregations themselves is separated from the final result. For - * example, the `count` in `count + 1` will be split into an [[AggregateExpression]] and a final - * computation that computes `count.resultAttribute + 1`. - */ + * An extractor used when planning the physical execution of an aggregation. Compared with a logical + * aggregation, the following transformations are performed: + * - Unnamed grouping expressions are named so that they can be referred to across phases of + * aggregation + * - Aggregations that appear multiple times are deduplicated. + * - The computation of the aggregations themselves is separated from the final result. For + * example, the `count` in `count + 1` will be split into an [[AggregateExpression]] and a final + * computation that computes `count.resultAttribute + 1`. + */ object PhysicalAggregation { // groupingExpressions, aggregateExpressions, resultExpressions, child type ReturnType = @@ -250,7 +249,7 @@ object PhysicalAggregation { // so replace each aggregate expression by its corresponding attribute in the set: equivalentAggregateExpressions.getEquivalentExprs(ae).headOption .getOrElse(ae).asInstanceOf[AggregateExpression].resultAttribute - // Similar to AggregateExpression + // Similar to AggregateExpression case ue: PythonUDF if PythonUDF.isGroupedAggPandasUDF(ue) => equivalentAggregateExpressions.getEquivalentExprs(ue).headOption .getOrElse(ue).asInstanceOf[PythonUDF].resultAttribute @@ -276,19 +275,19 @@ object PhysicalAggregation { } /** - * An extractor used when planning physical execution of a window. This extractor outputs - * the window function type of the logical window. - * - * The input logical window must contain same type of window functions, which is ensured by - * the rule ExtractWindowExpressions in the analyzer. - */ + * An extractor used when planning physical execution of a window. This extractor outputs + * the window function type of the logical window. + * + * The input logical window must contain same type of window functions, which is ensured by + * the rule ExtractWindowExpressions in the analyzer. + */ object PhysicalWindow { // windowFunctionType, windowExpression, partitionSpec, orderSpec, child private type ReturnType = (WindowFunctionType, Seq[NamedExpression], Seq[Expression], Seq[SortOrder], LogicalPlan) def unapply(a: Any): Option[ReturnType] = a match { - case expr@logical.Window(windowExpressions, partitionSpec, orderSpec, child) => + case expr @ logical.Window(windowExpressions, partitionSpec, orderSpec, child) => // The window expression should not be empty here, otherwise it's a bug. if (windowExpressions.isEmpty) { From e383f647a53a715d380cd36ae89553e60350d8e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=B7?= Date: Sat, 29 Jun 2019 16:21:41 +0800 Subject: [PATCH 08/12] Fix scala style --- .../main/scala/org/apache/spark/sql/hive/HiveStrategies.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 838d5fc90e8c..b7faa781525e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -26,7 +26,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, ScriptTransformation} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, + ScriptTransformation} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} From c65143d21d16edbf819f21cc009508ed4194e056 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=B7?= Date: Sat, 29 Jun 2019 16:34:18 +0800 Subject: [PATCH 09/12] Change method style --- .../org/apache/spark/sql/catalyst/planning/patterns.scala | 6 ++++-- .../sql/execution/datasources/FileSourceStrategy.scala | 3 +-- .../scala/org/apache/spark/sql/hive/HiveStrategies.scala | 4 +--- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 1c2e0b17aef8..274a9d1abfc3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -313,6 +313,8 @@ object PhysicalWindow { object ExtractPartitionPredicates extends Logging { + private type ReturnType = Seq[Expression] + private def resolvePredicatesExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = { if (!expr.references.isEmpty && expr.references.subsetOf(partitionKeyIds)) { @@ -356,8 +358,8 @@ object ExtractPartitionPredicates extends Logging { } } - def extractPartitionPredicate(predicates: Seq[Expression], - partitionKeyIds: AttributeSet): Seq[Expression] = { + def apply(predicates: Seq[Expression], + partitionKeyIds: AttributeSet): ReturnType = { predicates.map(resolveExpression(_, partitionKeyIds)) .filter(_ != null) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index f4102a82d23f..4d8e356b9579 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -154,8 +154,7 @@ object FileSourceStrategy extends Strategy with Logging { fsRelation.partitionSchema, fsRelation.sparkSession.sessionState.analyzer.resolver) val partitionSet = AttributeSet(partitionColumns) val partitionKeyFilters = - ExpressionSet(ExtractPartitionPredicates - .extractPartitionPredicate(normalizedFilters, partitionSet)) + ExpressionSet(ExtractPartitionPredicates(normalizedFilters, partitionSet)) logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index b7faa781525e..f5d663f0c369 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -248,9 +248,7 @@ private[hive] trait HiveStrategies { predicate.references.subsetOf(partitionKeyIds) } } - - val extractedPruningPredicates = - ExtractPartitionPredicates.extractPartitionPredicate(predicates, partitionKeyIds) + val extractedPruningPredicates = ExtractPartitionPredicates(predicates, partitionKeyIds) pruneFilterProject( projectList, From d791135f0746e185ebedb49d2312b148af01ae6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=B7?= Date: Sat, 29 Jun 2019 16:57:55 +0800 Subject: [PATCH 10/12] Add comment --- .../sql/catalyst/planning/patterns.scala | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 274a9d1abfc3..b5c4fec2540d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -311,6 +311,39 @@ object PhysicalWindow { } } +/** + * Extract partition push down condition from ExpressionSet + * Since origin judge condition is + * { + * !expression.references.isEmpty && + * expression.references.subsetOf(partitionKeyIds) + * } + * + * This can only push down simple condition expression. + * Such as table: + * CREATE TABLE DEFAULT.PARTITION_TABLE( + * A STRING, + * B STRING) + * PARTITIONED BY(DT STRING) + * + * With SQL: + * SELECT A, B + * FROM DEFAULT.PARTITION_TABLE + * WHERE DT = 20190601 OR (DT = 20190602 AND C = "TEST") + * + * Where condition "DT = 20190601 OR (DT = 20190602 AND C = "TEST")" + * can't be pushed down since it's reference is not subsetOf partition cols + * [[ExtractPartitionPredicates]] is to help extract hided partition logic in Or expression. + * It will return Or( DT = 20190601 , DT = 20190602 ) for partition push down. + * + * For special Or condition such as : + * SELECT A, B + * FROM DEFAULT.PARTITION_TABLE + * WHERE DT = 20190601 OR (DT = 20190602 OR C = "TEST") + * + * It won't think it's a validate push down condition and return a empty expression set. + * + */ object ExtractPartitionPredicates extends Logging { private type ReturnType = Seq[Expression] @@ -328,7 +361,11 @@ object ExtractPartitionPredicates extends Logging { right: Expression, op_type: String): Expression = { op_type.toUpperCase(Locale.ROOT) match { + // When construct 'Or' predicate only when hist children is valid. + // If not, we will return null case "OR" if left != null && right != null => Or(left, right) + // For 'And' expression , left and right constraints contradict each other. + // It's ok to return one side and both side case "AND" if left != null || right != null => if (left == null) { right @@ -349,6 +386,9 @@ object ExtractPartitionPredicates extends Logging { resolveExpression(right, partitionKeyIds), "and") case or@Or(left, right) + // only Or's both left and right child have partition keys can be chose + // Not valid Or expression will be handled by [[resolvePredicatesExpression]] + // It will return null and destroy treetop 'Or' expression and return null if or.children.forall(_.references.exists(ref => partitionKeyIds.contains(ref))) => constructBinaryOperators( resolveExpression(left, partitionKeyIds), From 6f81771034eaf8864ab2fd510a00e2e40b443b59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=B7?= Date: Sat, 29 Jun 2019 18:49:34 +0800 Subject: [PATCH 11/12] add ExtractPartitionPredicates to DataSourceStrategy --- .../execution/datasources/PruneFileSourcePartitions.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 9db7c30b2320..a180c93e05e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.catalog.CatalogStatistics import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.planning.{ExtractPartitionPredicates, PhysicalOperation} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule @@ -48,8 +48,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { partitionSchema, sparkSession.sessionState.analyzer.resolver) val partitionSet = AttributeSet(partitionColumns) val partitionKeyFilters = - ExpressionSet(normalizedFilters - .filter(_.references.subsetOf(partitionSet))) + ExpressionSet(ExtractPartitionPredicates(normalizedFilters, partitionSet)) if (partitionKeyFilters.nonEmpty) { val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) From 01386d3ee93b6fc96349ee5f3a02978ca8cefe63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=B7?= Date: Mon, 1 Jul 2019 09:43:33 +0800 Subject: [PATCH 12/12] remove return type since it's short --- .../org/apache/spark/sql/catalyst/planning/patterns.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index b5c4fec2540d..402e131be33b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -346,8 +346,6 @@ object PhysicalWindow { */ object ExtractPartitionPredicates extends Logging { - private type ReturnType = Seq[Expression] - private def resolvePredicatesExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = { if (!expr.references.isEmpty && expr.references.subsetOf(partitionKeyIds)) { @@ -399,7 +397,7 @@ object ExtractPartitionPredicates extends Logging { } def apply(predicates: Seq[Expression], - partitionKeyIds: AttributeSet): ReturnType = { + partitionKeyIds: AttributeSet): Seq[Expression] = { predicates.map(resolveExpression(_, partitionKeyIds)) .filter(_ != null) }