From 12c8c516c852e7e4423e34bbd3fafbeb2b9b5bcf Mon Sep 17 00:00:00 2001 From: Ivan Vergiliev Date: Tue, 21 May 2019 11:13:34 +0300 Subject: [PATCH 1/7] Initial implementation with filter-and-build in the same place --- .../datasources/orc/OrcFilters.scala | 211 +++++++++++++++++- 1 file changed, 209 insertions(+), 2 deletions(-) diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index f879d710b895..4a86f465a23e 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -24,8 +24,17 @@ import org.apache.orc.storage.ql.io.sarg.SearchArgument.TruthValue import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder import org.apache.orc.storage.serde2.io.HiveDecimalWritable +import org.apache.spark.sql.sources.And +import org.apache.spark.sql.sources.EqualNullSafe +import org.apache.spark.sql.sources.EqualTo import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.GreaterThan +import org.apache.spark.sql.sources.LessThan +import org.apache.spark.sql.sources.LessThanOrEqual +import org.apache.spark.sql.sources.Not +import org.apache.spark.sql.sources.Or import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.DecimalType.Expression /** * Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down. @@ -133,8 +142,206 @@ private[sql] object OrcFilters extends OrcFiltersBase { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { - trimNonConvertibleSubtrees(dataTypeMap, expression) - .map(createBuilder(dataTypeMap, _, builder)) + filterAndBuild(dataTypeMap, expression, builder) +// trimNonConvertibleSubtrees(dataTypeMap, expression) +// .map(createBuilder(dataTypeMap, _, builder)) + } + +// case class ExpressionFunctions( +// filterFn: (trimFn: (Filter => Option[Filter])) => Option[Filter], +// convertFn: (SearchArgument.Builder) => Unit +// ) + + sealed trait ActionType { + type T + } + case object FilterAction extends ActionType { + override type T = Option[Filter] + } + case object BuildAction extends ActionType { + override type T = Unit + } + + private def filterAndBuild( + dataTypeMap: Map[String, DataType], + expression: Filter, + builder: Builder + ): Option[Builder] = { + def getType(attribute: String): PredicateLeaf.Type = + getPredicateLeafType(dataTypeMap(attribute)) + + import org.apache.spark.sql.sources._ + + def performAction( + actionType: ActionType, + expression: Filter, + canPartialPushDownConjuncts: Boolean): Either[Option[Filter], Unit] = { + expression match { + case And(left, right) => + actionType match { + case FilterAction => + // At here, it is not safe to just keep one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. + // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to + // convert b in ('1'). If we only convert a = 2, we will end up with a filter + // NOT(a = 2), which will generate wrong results. + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported + // predicate can be safely removed. + val lhs = + performFilter(left, canPartialPushDownConjuncts = true) + val rhs = + performFilter(right, canPartialPushDownConjuncts = true) + (lhs, rhs) match { + case (Some(l), Some(r)) => Left(Some(And(l, r))) + case (Some(_), None) if canPartialPushDownConjuncts => Left(lhs) + case (None, Some(_)) if canPartialPushDownConjuncts => Left(rhs) + case _ => Left(None) + } + case BuildAction => + builder.startAnd() + updateBuilder(left) + updateBuilder(right) + builder.end() + Right(Unit) + } + + case Or(left, right) => + actionType match { + case FilterAction => + Left(for { + lhs: Filter <- performFilter(left, canPartialPushDownConjuncts = false) + rhs: Filter <- performFilter(right, canPartialPushDownConjuncts = false) + } yield Or(lhs, rhs)) + case BuildAction => + builder.startOr() + updateBuilder(left) + updateBuilder(right) + builder.end() + Right(Unit) + } + + case Not(child) => + actionType match { + case FilterAction => + val filteredSubtree = performFilter(child, canPartialPushDownConjuncts = false) + Left(filteredSubtree.map(Not(_))) + case BuildAction => + builder.startNot() + updateBuilder(child) + builder.end() + Right(Unit) + } + + case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().equals(quotedName, getType(attribute), castedValue).end() + Right(Unit) + } + case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end() + Right(Unit) + } + case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end() + Right(Unit) + } + case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end() + Right(Unit) + } + case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end() + Right(Unit) + } + case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end() + Right(Unit) + } + case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + builder.startAnd().isNull(quotedName, getType(attribute)).end() + Right(Unit) + } + case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + builder.startNot().isNull(quotedName, getType(attribute)).end() + Right(Unit) + } + case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) + builder.startAnd().in(quotedName, getType(attribute), + castedValues.map(_.asInstanceOf[AnyRef]): _*).end() + Right(Unit) + } + + case _ => + actionType match { + case FilterAction => Left(None) + case BuildAction => + throw new IllegalArgumentException(s"Can't build unsupported filter ${expression}") + } + } + } + + def performFilter(expression: Filter, canPartialPushDownConjuncts: Boolean) = + performAction(FilterAction, expression, canPartialPushDownConjuncts).left.get + + def updateBuilder(expression: Filter) = + performAction(BuildAction, expression, canPartialPushDownConjuncts = true).right.get + + + val filteredExpression = performFilter(expression, canPartialPushDownConjuncts = true) + filteredExpression.foreach(updateBuilder) + filteredExpression.map(_ => builder) +// if (filteredExpression.isDefined) { +// updateBuilder(filteredExpression.get) +// Some(builder) +// } else { +// None +// } } /** From ff0c397688af293d7ee54fe5776b8eb692f14736 Mon Sep 17 00:00:00 2001 From: Ivan Vergiliev Date: Tue, 21 May 2019 11:15:58 +0300 Subject: [PATCH 2/7] Remove commented out code --- .../datasources/orc/OrcFilters.scala | 204 ------------------ 1 file changed, 204 deletions(-) diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 4a86f465a23e..8d0cbb88933c 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -143,15 +143,8 @@ private[sql] object OrcFilters extends OrcFiltersBase { expression: Filter, builder: Builder): Option[Builder] = { filterAndBuild(dataTypeMap, expression, builder) -// trimNonConvertibleSubtrees(dataTypeMap, expression) -// .map(createBuilder(dataTypeMap, _, builder)) } -// case class ExpressionFunctions( -// filterFn: (trimFn: (Filter => Option[Filter])) => Option[Filter], -// convertFn: (SearchArgument.Builder) => Unit -// ) - sealed trait ActionType { type T } @@ -336,203 +329,6 @@ private[sql] object OrcFilters extends OrcFiltersBase { val filteredExpression = performFilter(expression, canPartialPushDownConjuncts = true) filteredExpression.foreach(updateBuilder) filteredExpression.map(_ => builder) -// if (filteredExpression.isDefined) { -// updateBuilder(filteredExpression.get) -// Some(builder) -// } else { -// None -// } - } - - /** - * Transforms a `Filter` by removing all subtrees that are not convertible to an ORC - * SearchArgument. - * - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @return A TrimmedFilter that wraps the transformed `Filter`. - */ - private def trimNonConvertibleSubtrees( - dataTypeMap: Map[String, DataType], - expression: Filter): Option[TrimmedFilter] = { - trimNonConvertibleSubtreesImpl(dataTypeMap, expression, canPartialPushDownConjuncts = true) - .map(TrimmedFilter) - } - - /** - * Internal recursive implementation of the `trimNonConvertibleSubtrees` method. We use two - * separate methods here in order to avoid dealing with the wrapper `TrimmedFilter` classes - * in the recursive implementation here, and only wrap the final result in the outer function. - * - * NOTE: If you change the set of supported `Filter` types here, you need to modify - * `createBuilder` accordingly! - * - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - * down safely. Pushing ONLY one side of AND down is safe to - * do at the top level or none of its ancestors is NOT and OR. - * @return the trimmed `Filter`. - */ - private def trimNonConvertibleSubtreesImpl( - dataTypeMap: Map[String, DataType], - expression: Filter, - canPartialPushDownConjuncts: Boolean): Option[Filter] = { - def getType(attribute: String): PredicateLeaf.Type = - getPredicateLeafType(dataTypeMap(attribute)) - - import org.apache.spark.sql.sources._ - - expression match { - case And(left, right) => - // At here, it is not safe to just keep one side and remove the other side - // if we do not understand what the parent filters are. - // - // Here is an example used to explain the reason. - // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to - // convert b in ('1'). If we only convert a = 2, we will end up with a filter - // NOT(a = 2), which will generate wrong results. - // - // Pushing one side of AND down is only safe to do at the top level or in the child - // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate - // can be safely removed. - val lhs = - trimNonConvertibleSubtreesImpl(dataTypeMap, left, canPartialPushDownConjuncts = true) - val rhs = - trimNonConvertibleSubtreesImpl(dataTypeMap, right, canPartialPushDownConjuncts = true) - (lhs, rhs) match { - case (Some(l), Some(r)) => Some(And(l, r)) - case (Some(_), None) if canPartialPushDownConjuncts => lhs - case (None, Some(_)) if canPartialPushDownConjuncts => rhs - case _ => None - } - - case Or(left, right) => - for { - lhs: Filter <- - trimNonConvertibleSubtreesImpl(dataTypeMap, left, canPartialPushDownConjuncts = false) - rhs: Filter <- - trimNonConvertibleSubtreesImpl(dataTypeMap, right, canPartialPushDownConjuncts = false) - } yield Or(lhs, rhs) - - case Not(child) => - val filteredSubtree = - trimNonConvertibleSubtreesImpl(dataTypeMap, child, canPartialPushDownConjuncts = false) - filteredSubtree.map(Not(_)) - - case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => Some(expression) - case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(expression) - case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(expression) - case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(expression) - case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(expression) - case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(expression) - case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => Some(expression) - case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => Some(expression) - case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => Some(expression) - - case _ => None - } - } - - /** - * Build a SearchArgument for a Filter that has already been trimmed so as to only contain - * expressions that are convertible to a `SearchArgument`. This allows for a more efficient and - * more readable implementation since there's no need to check every node before converting it. - * - * NOTE: If you change the set of supported `Filter` types here, you need to modify - * `trimNonConvertibleSubtreesImpl` accordingly! - * - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the trimmed input filter predicates. - * @param builder the builder so far. - * @return - */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: TrimmedFilter, - builder: Builder): Builder = { - def getType(attribute: String): PredicateLeaf.Type = - getPredicateLeafType(dataTypeMap(attribute)) - - import org.apache.spark.sql.sources._ - def updateBuilder(subexpression: Filter): Unit = subexpression match { - case And(left, right) => - builder.startAnd() - updateBuilder(left) - updateBuilder(right) - builder.end() - - case Or(left, right) => - builder.startOr() - updateBuilder(left) - updateBuilder(right) - builder.end() - - case Not(child) => - builder.startNot() - updateBuilder(child) - builder.end() - - // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` - // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be - // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). - - case EqualTo(attribute, value) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startAnd().equals(quotedName, getType(attribute), castedValue).end() - - case EqualNullSafe(attribute, value) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end() - - case LessThan(attribute, value) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end() - - case LessThanOrEqual(attribute, value) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end() - - case GreaterThan(attribute, value) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end() - - case GreaterThanOrEqual(attribute, value) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end() - - case IsNull(attribute) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - builder.startAnd().isNull(quotedName, getType(attribute)).end() - - case IsNotNull(attribute) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - builder.startNot().isNull(quotedName, getType(attribute)).end() - - case In(attribute, values) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) - builder.startAnd().in(quotedName, getType(attribute), - castedValues.map(_.asInstanceOf[AnyRef]): _*).end() - - // This case should never happen since this function only expects fully convertible filters. - // However, we return true as a safety measure in case something goes wrong. - case _ => builder.startAnd().literal(TruthValue.YES).end() - } - - updateBuilder(expression.filter) - builder } } From 37095b3b6f6f7ce8355da4dc4e01672dd7d971f2 Mon Sep 17 00:00:00 2001 From: Ivan Vergiliev Date: Tue, 21 May 2019 11:35:54 +0300 Subject: [PATCH 3/7] Get rid of the type member in ActionType --- .../sql/execution/datasources/orc/OrcFilters.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 8d0cbb88933c..a70af57d79cb 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -145,15 +145,9 @@ private[sql] object OrcFilters extends OrcFiltersBase { filterAndBuild(dataTypeMap, expression, builder) } - sealed trait ActionType { - type T - } - case object FilterAction extends ActionType { - override type T = Option[Filter] - } - case object BuildAction extends ActionType { - override type T = Unit - } + sealed trait ActionType + case object FilterAction extends ActionType + case object BuildAction extends ActionType private def filterAndBuild( dataTypeMap: Map[String, DataType], From 4427c096486fecd658c1131028df02b53154bf1a Mon Sep 17 00:00:00 2001 From: Ivan Vergiliev Date: Wed, 22 May 2019 17:50:59 +0300 Subject: [PATCH 4/7] Remove unused imports --- .../sql/execution/datasources/orc/OrcFilters.scala | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index a70af57d79cb..ad4116979060 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -20,21 +20,11 @@ package org.apache.spark.sql.execution.datasources.orc import org.apache.orc.storage.common.`type`.HiveDecimal import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder -import org.apache.orc.storage.ql.io.sarg.SearchArgument.TruthValue import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder import org.apache.orc.storage.serde2.io.HiveDecimalWritable -import org.apache.spark.sql.sources.And -import org.apache.spark.sql.sources.EqualNullSafe -import org.apache.spark.sql.sources.EqualTo import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.sources.GreaterThan -import org.apache.spark.sql.sources.LessThan -import org.apache.spark.sql.sources.LessThanOrEqual -import org.apache.spark.sql.sources.Not -import org.apache.spark.sql.sources.Or import org.apache.spark.sql.types._ -import org.apache.spark.sql.types.DecimalType.Expression /** * Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down. From 9e43765a3b3d2168542bb4864221c3b18c0df734 Mon Sep 17 00:00:00 2001 From: Ivan Vergiliev Date: Mon, 27 May 2019 09:36:09 +0300 Subject: [PATCH 5/7] Add comments to new functions --- .../datasources/orc/OrcFilters.scala | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index ad4116979060..b7c979ef304c 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -139,6 +139,20 @@ private[sql] object OrcFilters extends OrcFiltersBase { case object FilterAction extends ActionType case object BuildAction extends ActionType + /** + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. + */ private def filterAndBuild( dataTypeMap: Map[String, DataType], expression: Filter, @@ -149,6 +163,25 @@ private[sql] object OrcFilters extends OrcFiltersBase { import org.apache.spark.sql.sources._ + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + // + // Doing things this way does have some annoying side effects: + // - We need to return an `Either`, with one action type always returning a Left and the other + // always returning a Right. + // - We always need to pass the canPartialPushDownConjuncts parameter even though the build + // action doesn't need it (because by the time we run the `build` operation, we know all + // remaining nodes are convertible). def performAction( actionType: ActionType, expression: Filter, @@ -309,7 +342,6 @@ private[sql] object OrcFilters extends OrcFiltersBase { def updateBuilder(expression: Filter) = performAction(BuildAction, expression, canPartialPushDownConjuncts = true).right.get - val filteredExpression = performFilter(expression, canPartialPushDownConjuncts = true) filteredExpression.foreach(updateBuilder) filteredExpression.map(_ => builder) From 52939514138691191e2588934588dc5cf435a8f5 Mon Sep 17 00:00:00 2001 From: Ivan Vergiliev Date: Mon, 27 May 2019 09:39:08 +0300 Subject: [PATCH 6/7] Remove now unused TrimmedFilter class --- .../sql/execution/datasources/orc/OrcFilters.scala | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index b7c979ef304c..8b81c56dbd3a 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -111,20 +111,6 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } - /** - * A TrimmedFilter is a Filter that has been trimmed such that all the remaining nodes - * are convertible to ORC predicates. - * - * Since nothing in the underlying representation of the Filter is actually different from a - * regular Filter (the only difference is that we might remove some subtrees), this class is just - * a wrapper around a `Filter` value. The main benefits of using this class are readability - * and type safety (to signal that the respective functions only work with already trimmed - * filters). - * - * @param filter The underlying filter representation. - */ - private case class TrimmedFilter(filter: Filter) extends AnyVal - /** * Build a SearchArgument and return the builder so far. */ From 1d822b08aae94ecec09c172fe48be6c12a2fc89d Mon Sep 17 00:00:00 2001 From: Ivan Vergiliev Date: Mon, 27 May 2019 09:44:02 +0300 Subject: [PATCH 7/7] Apply changes to 2.3.4 version of the file --- .../datasources/orc/OrcFilters.scala | 383 +++++++++--------- 1 file changed, 195 insertions(+), 188 deletions(-) diff --git a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 6dc991636f73..ab48f93ccb32 100644 --- a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -111,20 +111,6 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } - /** - * A TrimmedFilter is a Filter that has been trimmed such that all the remaining nodes - * are convertible to ORC predicates. - * - * Since nothing in the underlying representation of the Filter is actually different from a - * regular Filter (the only difference is that we might remove some subtrees), this class is just - * a wrapper around a `Filter` value. The main benefits of using this class are readability - * and type safety (to signal that the respective functions only work with already trimmed - * filters). - * - * @param filter The underlying filter representation. - */ - private case class TrimmedFilter(filter: Filter) extends AnyVal - /** * Build a SearchArgument and return the builder so far. */ @@ -132,197 +118,218 @@ private[sql] object OrcFilters extends OrcFiltersBase { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { - trimNonConvertibleSubtrees(dataTypeMap, expression) - .map(createBuilder(dataTypeMap, _, builder)) + filterAndBuild(dataTypeMap, expression, builder) } - /** - * Transforms a `Filter` by removing all subtrees that are not convertible to an ORC - * SearchArgument. - * - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @return A TrimmedFilter that wraps the transformed `Filter`. - */ - private def trimNonConvertibleSubtrees( - dataTypeMap: Map[String, DataType], - expression: Filter): Option[TrimmedFilter] = { - trimNonConvertibleSubtreesImpl(dataTypeMap, expression, canPartialPushDownConjuncts = true) - .map(TrimmedFilter) - } + sealed trait ActionType + case object FilterAction extends ActionType + case object BuildAction extends ActionType /** - * Internal recursive implementation of the `trimNonConvertibleSubtrees` method. We use two - * separate methods here in order to avoid dealing with the wrapper `TrimmedFilter` classes - * in the recursive implementation here, and only wrap the final result in the outer function. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. * - * NOTE: If you change the set of supported `Filter` types here, you need to modify - * `createBuilder` accordingly! + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. * - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - * down safely. Pushing ONLY one side of AND down is safe to - * do at the top level or none of its ancestors is NOT and OR. - * @return the trimmed `Filter`. + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def trimNonConvertibleSubtreesImpl( + private def filterAndBuild( dataTypeMap: Map[String, DataType], expression: Filter, - canPartialPushDownConjuncts: Boolean): Option[Filter] = { + builder: Builder + ): Option[Builder] = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) import org.apache.spark.sql.sources._ - expression match { - case And(left, right) => - // At here, it is not safe to just keep one side and remove the other side - // if we do not understand what the parent filters are. - // - // Here is an example used to explain the reason. - // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to - // convert b in ('1'). If we only convert a = 2, we will end up with a filter - // NOT(a = 2), which will generate wrong results. - // - // Pushing one side of AND down is only safe to do at the top level or in the child - // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate - // can be safely removed. - val lhs = - trimNonConvertibleSubtreesImpl(dataTypeMap, left, canPartialPushDownConjuncts = true) - val rhs = - trimNonConvertibleSubtreesImpl(dataTypeMap, right, canPartialPushDownConjuncts = true) - (lhs, rhs) match { - case (Some(l), Some(r)) => Some(And(l, r)) - case (Some(_), None) if canPartialPushDownConjuncts => lhs - case (None, Some(_)) if canPartialPushDownConjuncts => rhs - case _ => None - } - - case Or(left, right) => - for { - lhs <- - trimNonConvertibleSubtreesImpl(dataTypeMap, left, canPartialPushDownConjuncts = false) - rhs <- - trimNonConvertibleSubtreesImpl(dataTypeMap, right, canPartialPushDownConjuncts = false) - } yield Or(lhs, rhs) - - case Not(child) => - trimNonConvertibleSubtreesImpl(dataTypeMap, child, canPartialPushDownConjuncts = false) - .map(Not) - - case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => Some(expression) - case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(expression) - case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(expression) - case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(expression) - case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(expression) - case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(expression) - case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => Some(expression) - case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => Some(expression) - case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => Some(expression) - - case _ => None + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + // + // Doing things this way does have some annoying side effects: + // - We need to return an `Either`, with one action type always returning a Left and the other + // always returning a Right. + // - We always need to pass the canPartialPushDownConjuncts parameter even though the build + // action doesn't need it (because by the time we run the `build` operation, we know all + // remaining nodes are convertible). + def performAction( + actionType: ActionType, + expression: Filter, + canPartialPushDownConjuncts: Boolean): Either[Option[Filter], Unit] = { + expression match { + case And(left, right) => + actionType match { + case FilterAction => + // At here, it is not safe to just keep one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. + // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to + // convert b in ('1'). If we only convert a = 2, we will end up with a filter + // NOT(a = 2), which will generate wrong results. + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported + // predicate can be safely removed. + val lhs = + performFilter(left, canPartialPushDownConjuncts = true) + val rhs = + performFilter(right, canPartialPushDownConjuncts = true) + (lhs, rhs) match { + case (Some(l), Some(r)) => Left(Some(And(l, r))) + case (Some(_), None) if canPartialPushDownConjuncts => Left(lhs) + case (None, Some(_)) if canPartialPushDownConjuncts => Left(rhs) + case _ => Left(None) + } + case BuildAction => + builder.startAnd() + updateBuilder(left) + updateBuilder(right) + builder.end() + Right(Unit) + } + + case Or(left, right) => + actionType match { + case FilterAction => + Left(for { + lhs: Filter <- performFilter(left, canPartialPushDownConjuncts = false) + rhs: Filter <- performFilter(right, canPartialPushDownConjuncts = false) + } yield Or(lhs, rhs)) + case BuildAction => + builder.startOr() + updateBuilder(left) + updateBuilder(right) + builder.end() + Right(Unit) + } + + case Not(child) => + actionType match { + case FilterAction => + val filteredSubtree = performFilter(child, canPartialPushDownConjuncts = false) + Left(filteredSubtree.map(Not(_))) + case BuildAction => + builder.startNot() + updateBuilder(child) + builder.end() + Right(Unit) + } + + case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().equals(quotedName, getType(attribute), castedValue).end() + Right(Unit) + } + case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end() + Right(Unit) + } + case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end() + Right(Unit) + } + case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end() + Right(Unit) + } + case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end() + Right(Unit) + } + case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end() + Right(Unit) + } + case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + builder.startAnd().isNull(quotedName, getType(attribute)).end() + Right(Unit) + } + case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + builder.startNot().isNull(quotedName, getType(attribute)).end() + Right(Unit) + } + case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => + actionType match { + case FilterAction => Left(Some(expression)) + case BuildAction => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) + builder.startAnd().in(quotedName, getType(attribute), + castedValues.map(_.asInstanceOf[AnyRef]): _*).end() + Right(Unit) + } + + case _ => + actionType match { + case FilterAction => Left(None) + case BuildAction => + throw new IllegalArgumentException(s"Can't build unsupported filter ${expression}") + } + } } - } - /** - * Build a SearchArgument for a Filter that has already been trimmed so as to only contain - * expressions that are convertible to a `SearchArgument`. This allows for a more efficient and - * more readable implementation since there's no need to check every node before converting it. - * - * NOTE: If you change the set of supported `Filter` types here, you need to modify - * `trimNonConvertibleSubtreesImpl` accordingly! - * - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the trimmed input filter predicates. - * @param builder the builder so far. - * @return - */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: TrimmedFilter, - builder: Builder): Builder = { - def getType(attribute: String): PredicateLeaf.Type = - getPredicateLeafType(dataTypeMap(attribute)) - - import org.apache.spark.sql.sources._ - def updateBuilder(subexpression: Filter): Unit = subexpression match { - case And(left, right) => - builder.startAnd() - updateBuilder(left) - updateBuilder(right) - builder.end() - - case Or(left, right) => - builder.startOr() - updateBuilder(left) - updateBuilder(right) - builder.end() - - case Not(child) => - builder.startNot() - updateBuilder(child) - builder.end() - - // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` - // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be - // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). - - case EqualTo(attribute, value) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startAnd().equals(quotedName, getType(attribute), castedValue).end() + def performFilter(expression: Filter, canPartialPushDownConjuncts: Boolean) = + performAction(FilterAction, expression, canPartialPushDownConjuncts).left.get - case EqualNullSafe(attribute, value) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end() - - case LessThan(attribute, value) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end() - - case LessThanOrEqual(attribute, value) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end() - - case GreaterThan(attribute, value) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end() - - case GreaterThanOrEqual(attribute, value) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end() - - case IsNull(attribute) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - builder.startAnd().isNull(quotedName, getType(attribute)).end() - - case IsNotNull(attribute) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - builder.startNot().isNull(quotedName, getType(attribute)).end() - - case In(attribute, values) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) - builder.startAnd().in(quotedName, getType(attribute), - castedValues.map(_.asInstanceOf[AnyRef]): _*).end() - - // This case should never happen since this function only expects fully convertible filters. - // However, we return true as a safety measure in case something goes wrong. - case _ => builder.startAnd().literal(TruthValue.YES).end() - } + def updateBuilder(expression: Filter) = + performAction(BuildAction, expression, canPartialPushDownConjuncts = true).right.get - updateBuilder(expression.filter) - builder + val filteredExpression = performFilter(expression, canPartialPushDownConjuncts = true) + filteredExpression.foreach(updateBuilder) + filteredExpression.map(_ => builder) } }