diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala index 8d4898a6b85c..0b5658715377 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala @@ -38,7 +38,7 @@ trait OrcFiltersBase { // Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters // in order to distinguish predicate pushdown for nested columns. - protected def quoteAttributeNameIfNeeded(name: String) : String = { + protected[sql] def quoteAttributeNameIfNeeded(name: String) : String = { if (!name.contains("`") && name.contains(".")) { s"`$name`" } else { @@ -50,7 +50,7 @@ trait OrcFiltersBase { * Return true if this is a searchable type in ORC. * Both CharType and VarcharType are cleaned at AstBuilder. */ - protected def isSearchableType(dataType: DataType) = dataType match { + protected[sql] def isSearchableType(dataType: DataType) = dataType match { case BinaryType => false case _: AtomicType => true case _ => false diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 9e4bf22ff7e3..e87f7d8caf4f 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -29,8 +29,9 @@ import org.apache.spark.sql.types._ /** * Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down. * - * Due to limitation of ORC `SearchArgument` builder, we had to end up with a pretty weird double- - * checking pattern when converting `And`/`Or`/`Not` filters. + * Due to limitation of ORC `SearchArgument` builder, we had to implement separate checking and + * conversion passes through the Filter to make sure we only convert predicates that are known + * to be convertible. * * An ORC `SearchArgument` must be built in one pass using a single builder. For example, you can't * build `a = 1` and `b = 2` first, and then combine them into `a = 1 AND b = 2`. This is quite @@ -39,18 +40,18 @@ import org.apache.spark.sql.types._ * * The annoying part is that, `SearchArgument` builder methods like `startAnd()`, `startOr()`, and * `startNot()` mutate internal state of the builder instance. This forces us to translate all - * convertible filters with a single builder instance. However, before actually converting a filter, - * we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible filter is - * found, we may already end up with a builder whose internal state is inconsistent. + * convertible filters with a single builder instance. However, if we try to translate a filter + * before checking whether it can be converted or not, we may end up with a builder whose internal + * state is inconsistent in the case of an inconvertible filter. * * For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and then * try to convert its children. Say we convert `left` child successfully, but find that `right` * child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is inconsistent * now. * - * The workaround employed here is that, for `And`/`Or`/`Not`, we first try to convert their - * children with brand new builders, and only do the actual conversion with the right builder - * instance when the children are proven to be convertible. + * The workaround employed here is to trim the Spark filters before trying to convert them. This + * way, we can only do the actual conversion on the part of the Filter that is known to be + * convertible. * * P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only. Usage of * builder methods mentioned above can only be found in test code, where all tested filters are @@ -63,11 +64,12 @@ private[sql] object OrcFilters extends OrcFiltersBase { */ def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap + val orcFilterConverter = new OrcFilterConverter(dataTypeMap) for { - // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters)) + // Combines all filters using `And` to produce a single conjunction + conjunction <- buildTree(filters) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) + builder <- orcFilterConverter.buildSearchArgument(conjunction, newBuilder) } yield builder.build() } @@ -75,43 +77,13 @@ private[sql] object OrcFilters extends OrcFiltersBase { schema: StructType, dataTypeMap: Map[String, DataType], filters: Seq[Filter]): Seq[Filter] = { - import org.apache.spark.sql.sources._ + val orcFilterConverter = new OrcFilterConverter(dataTypeMap) + filters.flatMap(orcFilterConverter.trimUnconvertibleFilters) + } - def convertibleFiltersHelper( - filter: Filter, - canPartialPushDown: Boolean): Option[Filter] = filter match { - case And(left, right) => - val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) - val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) - (leftResultOptional, rightResultOptional) match { - case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult)) - case (Some(leftResult), None) if canPartialPushDown => Some(leftResult) - case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult) - case _ => None - } +} - case Or(left, right) => - val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) - val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) - if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) { - None - } else { - Some(Or(leftResultOptional.get, rightResultOptional.get)) - } - case Not(pred) => - val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) - resultOptional.map(Not) - case other => - if (buildSearchArgument(dataTypeMap, other, newBuilder()).isDefined) { - Some(other) - } else { - None - } - } - filters.flatMap { filter => - convertibleFiltersHelper(filter, true) - } - } +private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) { /** * Get PredicateLeafType which is corresponding to the given DataType. @@ -143,145 +115,229 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { - createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) + trimUnconvertibleFilters(expression).map { filter => + updateBuilder(filter, builder) + builder + } } /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - * down safely. Pushing ONLY one side of AND down is safe to - * do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. + * Removes all sub-Filters from a given Filter that are not convertible to an ORC SearchArgument. */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { + performAction(TrimUnconvertibleFilters(canPartialPushDownConjuncts = true), expression) + } + + /** + * Builds a SearchArgument for the given Filter. This method should only be called on Filters + * that have previously been trimmed to remove unsupported sub-Filters! + */ + private def updateBuilder(expression: Filter, builder: Builder): Unit = + performAction(BuildSearchArgument(builder), expression) + + sealed trait ActionType[ReturnType] + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) + extends ActionType[Option[Filter]] + case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] + + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + private def performAction[ReturnType]( + actionType: ActionType[ReturnType], + expression: Filter): ReturnType = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) - import org.apache.spark.sql.sources._ - expression match { case And(left, right) => - // At here, it is not safe to just convert one side and remove the other side - // if we do not understand what the parent filters are. - // - // Here is an example used to explain the reason. - // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to - // convert b in ('1'). If we only convert a = 2, we will end up with a filter - // NOT(a = 2), which will generate wrong results. - // - // Pushing one side of AND down is only safe to do at the top level or in the child - // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate - // can be safely removed. - val leftBuilderOption = - createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) - val rightBuilderOption = - createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) - (leftBuilderOption, rightBuilderOption) match { - case (Some(_), Some(_)) => - for { - lhs <- createBuilder(dataTypeMap, left, - builder.startAnd(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) - } yield rhs.end() - - case (Some(_), None) if canPartialPushDownConjuncts => - createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) - - case (None, Some(_)) if canPartialPushDownConjuncts => - createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts) - - case _ => None + actionType match { + case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) => + // At here, it is not safe to just keep one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. + // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to + // convert b in ('1'). If we only convert a = 2, we will end up with a filter + // NOT(a = 2), which will generate wrong results. + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported + // predicate can be safely removed. + val lhs = performAction(t, left) + val rhs = performAction(t, right) + (lhs, rhs) match { + case (Some(l), Some(r)) => Some(And(l, r)) + case (Some(_), None) if canPartialPushDownConjuncts => lhs + case (None, Some(_)) if canPartialPushDownConjuncts => rhs + case _ => None + } + case b @ BuildSearchArgument(builder) => + builder.startAnd() + performAction(b, left) + performAction(b, right) + builder.end() + () } case Or(left, right) => - // The Or predicate is convertible when both of its children can be pushed down. - // That is to say, if one/both of the children can be partially pushed down, the Or - // predicate can be partially pushed down as well. - // - // Here is an example used to explain the reason. - // Let's say we have - // (a1 AND a2) OR (b1 AND b2), - // a1 and b1 is convertible, while a2 and b2 is not. - // The predicate can be converted as - // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) - // As per the logical in And predicate, we can push down (a1 OR b1). - for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) - lhs <- createBuilder(dataTypeMap, left, - builder.startOr(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) - } yield rhs.end() + actionType match { + case t: TrimUnconvertibleFilters => + // The Or predicate is convertible when both of its children can be pushed down. + // That is to say, if one/both of the children can be partially pushed down, the Or + // predicate can be partially pushed down as well. + // + // Here is an example used to explain the reason. + // Let's say we have + // (a1 AND a2) OR (b1 AND b2), + // a1 and b1 is convertible, while a2 and b2 is not. + // The predicate can be converted as + // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) + // As per the logical in And predicate, we can push down (a1 OR b1). + for { + lhs: Filter <- performAction(t, left) + rhs: Filter <- performAction(t, right) + } yield Or(lhs, rhs) + case b @ BuildSearchArgument(builder) => + builder.startOr() + performAction(b, left) + performAction(b, right) + builder.end() + () + } case Not(child) => - for { - _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) - negate <- createBuilder(dataTypeMap, - child, builder.startNot(), canPartialPushDownConjuncts = false) - } yield negate.end() + actionType match { + case t: TrimUnconvertibleFilters => + performAction(t.copy(canPartialPushDownConjuncts = false), child).map(Not) + case b @ BuildSearchArgument(builder) => + builder.startNot() + performAction(b, child) + builder.end() + () + } - // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` - // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be - // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). + // NOTE: For all case branches dealing with leaf predicates below, the additional + // `startAnd()` call is mandatory. ORC `SearchArgument` builder requires that all leaf + // predicates must be wrapped by a "parent" predicate (`And`, `Or`, or `Not`). case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().equals(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().equals(quotedName, getType(attribute), castedValue).end() + () + } case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end() + () + } case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end() + () + } case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end() + () + } case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end() + () + } case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end() + () + } case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - Some(builder.startAnd().isNull(quotedName, getType(attribute)).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + builder.startAnd().isNull(quotedName, getType(attribute)).end() + () + } case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - Some(builder.startNot().isNull(quotedName, getType(attribute)).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + builder.startNot().isNull(quotedName, getType(attribute)).end() + () + } case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) - Some(builder.startAnd().in(quotedName, getType(attribute), - castedValues.map(_.asInstanceOf[AnyRef]): _*).end()) + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) + builder.startAnd().in(quotedName, getType(attribute), + castedValues.map(_.asInstanceOf[AnyRef]): _*).end() + () + } - case _ => None + case _ => + actionType match { + case _: TrimUnconvertibleFilters => None + case BuildSearchArgument(builder) => + throw new IllegalArgumentException(s"Can't build unsupported filter ${expression}") + } } } } + diff --git a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 632a72a32abd..ebcd3567d34c 100644 --- a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -29,8 +29,9 @@ import org.apache.spark.sql.types._ /** * Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down. * - * Due to limitation of ORC `SearchArgument` builder, we had to end up with a pretty weird double- - * checking pattern when converting `And`/`Or`/`Not` filters. + * Due to limitation of ORC `SearchArgument` builder, we had to implement separate checking and + * conversion passes through the Filter to make sure we only convert predicates that are known + * to be convertible. * * An ORC `SearchArgument` must be built in one pass using a single builder. For example, you can't * build `a = 1` and `b = 2` first, and then combine them into `a = 1 AND b = 2`. This is quite @@ -39,18 +40,18 @@ import org.apache.spark.sql.types._ * * The annoying part is that, `SearchArgument` builder methods like `startAnd()`, `startOr()`, and * `startNot()` mutate internal state of the builder instance. This forces us to translate all - * convertible filters with a single builder instance. However, before actually converting a filter, - * we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible filter is - * found, we may already end up with a builder whose internal state is inconsistent. + * convertible filters with a single builder instance. However, if we try to translate a filter + * before checking whether it can be converted or not, we may end up with a builder whose internal + * state is inconsistent in the case of an inconvertible filter. * * For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and then * try to convert its children. Say we convert `left` child successfully, but find that `right` * child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is inconsistent * now. * - * The workaround employed here is that, for `And`/`Or`/`Not`, we first try to convert their - * children with brand new builders, and only do the actual conversion with the right builder - * instance when the children are proven to be convertible. + * The workaround employed here is to trim the Spark filters before trying to convert them. This + * way, we can only do the actual conversion on the part of the Filter that is known to be + * convertible. * * P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only. Usage of * builder methods mentioned above can only be found in test code, where all tested filters are @@ -63,11 +64,12 @@ private[sql] object OrcFilters extends OrcFiltersBase { */ def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap + val orcFilterConverter = new OrcFilterConverter(dataTypeMap) for { - // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters)) + // Combines all filters using `And` to produce a single conjunction + conjunction <- buildTree(filters) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) + builder <- orcFilterConverter.buildSearchArgument(conjunction, newBuilder) } yield builder.build() } @@ -75,43 +77,13 @@ private[sql] object OrcFilters extends OrcFiltersBase { schema: StructType, dataTypeMap: Map[String, DataType], filters: Seq[Filter]): Seq[Filter] = { - import org.apache.spark.sql.sources._ + val orcFilterConverter = new OrcFilterConverter(dataTypeMap) + filters.flatMap(orcFilterConverter.trimUnconvertibleFilters) + } - def convertibleFiltersHelper( - filter: Filter, - canPartialPushDown: Boolean): Option[Filter] = filter match { - case And(left, right) => - val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) - val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) - (leftResultOptional, rightResultOptional) match { - case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult)) - case (Some(leftResult), None) if canPartialPushDown => Some(leftResult) - case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult) - case _ => None - } +} - case Or(left, right) => - val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) - val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) - if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) { - None - } else { - Some(Or(leftResultOptional.get, rightResultOptional.get)) - } - case Not(pred) => - val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) - resultOptional.map(Not) - case other => - if (buildSearchArgument(dataTypeMap, other, newBuilder()).isDefined) { - Some(other) - } else { - None - } - } - filters.flatMap { filter => - convertibleFiltersHelper(filter, true) - } - } +private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) { /** * Get PredicateLeafType which is corresponding to the given DataType. @@ -143,144 +115,228 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _ => value } + import org.apache.spark.sql.sources._ + import OrcFilters._ + /** - * Build a SearchArgument and return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, builder: Builder): Option[Builder] = { - createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) + trimUnconvertibleFilters(expression).map { filter => + updateBuilder(filter, builder) + builder + } } /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - * down safely. Pushing ONLY one side of AND down is safe to - * do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. + * Removes all sub-Filters from a given Filter that are not convertible to an ORC SearchArgument. */ - private def createBuilder( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { + performAction(TrimUnconvertibleFilters(canPartialPushDownConjuncts = true), expression) + } + + /** + * Builds a SearchArgument for the given Filter. This method should only be called on Filters + * that have previously been trimmed to remove unsupported sub-Filters! + */ + private def updateBuilder(expression: Filter, builder: Builder): Unit = + performAction(BuildSearchArgument(builder), expression) + + sealed trait ActionType[ReturnType] + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) + extends ActionType[Option[Filter]] + case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] + + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + private def performAction[ReturnType]( + actionType: ActionType[ReturnType], + expression: Filter): ReturnType = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) - import org.apache.spark.sql.sources._ - expression match { case And(left, right) => - // At here, it is not safe to just convert one side and remove the other side - // if we do not understand what the parent filters are. - // - // Here is an example used to explain the reason. - // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to - // convert b in ('1'). If we only convert a = 2, we will end up with a filter - // NOT(a = 2), which will generate wrong results. - // - // Pushing one side of AND down is only safe to do at the top level or in the child - // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate - // can be safely removed. - val leftBuilderOption = - createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) - val rightBuilderOption = - createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) - (leftBuilderOption, rightBuilderOption) match { - case (Some(_), Some(_)) => - for { - lhs <- createBuilder(dataTypeMap, left, - builder.startAnd(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) - } yield rhs.end() - - case (Some(_), None) if canPartialPushDownConjuncts => - createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) - - case (None, Some(_)) if canPartialPushDownConjuncts => - createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts) - - case _ => None + actionType match { + case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) => + // At here, it is not safe to just keep one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. + // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to + // convert b in ('1'). If we only convert a = 2, we will end up with a filter + // NOT(a = 2), which will generate wrong results. + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported + // predicate can be safely removed. + val lhs = performAction(t, left) + val rhs = performAction(t, right) + (lhs, rhs) match { + case (Some(l), Some(r)) => Some(And(l, r)) + case (Some(_), None) if canPartialPushDownConjuncts => lhs + case (None, Some(_)) if canPartialPushDownConjuncts => rhs + case _ => None + } + case b @ BuildSearchArgument(builder) => + builder.startAnd() + performAction(b, left) + performAction(b, right) + builder.end() + () } case Or(left, right) => - // The Or predicate is convertible when both of its children can be pushed down. - // That is to say, if one/both of the children can be partially pushed down, the Or - // predicate can be partially pushed down as well. - // - // Here is an example used to explain the reason. - // Let's say we have - // (a1 AND a2) OR (b1 AND b2), - // a1 and b1 is convertible, while a2 and b2 is not. - // The predicate can be converted as - // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) - // As per the logical in And predicate, we can push down (a1 OR b1). - for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) - lhs <- createBuilder(dataTypeMap, left, builder.startOr(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) - } yield rhs.end() + actionType match { + case t: TrimUnconvertibleFilters => + // The Or predicate is convertible when both of its children can be pushed down. + // That is to say, if one/both of the children can be partially pushed down, the Or + // predicate can be partially pushed down as well. + // + // Here is an example used to explain the reason. + // Let's say we have + // (a1 AND a2) OR (b1 AND b2), + // a1 and b1 is convertible, while a2 and b2 is not. + // The predicate can be converted as + // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) + // As per the logical in And predicate, we can push down (a1 OR b1). + for { + lhs: Filter <- performAction(t, left) + rhs: Filter <- performAction(t, right) + } yield Or(lhs, rhs) + case b @ BuildSearchArgument(builder) => + builder.startOr() + performAction(b, left) + performAction(b, right) + builder.end() + () + } case Not(child) => - for { - _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) - negate <- createBuilder(dataTypeMap, - child, builder.startNot(), canPartialPushDownConjuncts = false) - } yield negate.end() + actionType match { + case t: TrimUnconvertibleFilters => + performAction(t.copy(canPartialPushDownConjuncts = false), child).map(Not) + case b @ BuildSearchArgument(builder) => + builder.startNot() + performAction(b, child) + builder.end() + () + } - // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` - // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be - // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). + // NOTE: For all case branches dealing with leaf predicates below, the additional + // `startAnd()` call is mandatory. ORC `SearchArgument` builder requires that all leaf + // predicates must be wrapped by a "parent" predicate (`And`, `Or`, or `Not`). case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().equals(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().equals(quotedName, getType(attribute), castedValue).end() + () + } case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end() + () + } case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end() + () + } case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end() + () + } case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end() + () + } case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end() + () + } case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - Some(builder.startAnd().isNull(quotedName, getType(attribute)).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + builder.startAnd().isNull(quotedName, getType(attribute)).end() + () + } case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - Some(builder.startNot().isNull(quotedName, getType(attribute)).end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + builder.startNot().isNull(quotedName, getType(attribute)).end() + () + } case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) - Some(builder.startAnd().in(quotedName, getType(attribute), - castedValues.map(_.asInstanceOf[AnyRef]): _*).end()) + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) + builder.startAnd().in(quotedName, getType(attribute), + castedValues.map(_.asInstanceOf[AnyRef]): _*).end() + () + } - case _ => None + case _ => + actionType match { + case _: TrimUnconvertibleFilters => None + case BuildSearchArgument(builder) => + throw new IllegalArgumentException(s"Can't build unsupported filter ${expression}") + } } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index 3bfe157f5fe1..cee8fdbe43a1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -62,176 +62,260 @@ import org.apache.spark.sql.types._ */ private[orc] object OrcFilters extends Logging { - private def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = { - val method = klass.getMethod(name, args: _*) - method.setAccessible(true) - method - } - def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = { if (HiveUtils.isHive23) { DatasourceOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]] } else { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap - - // First, tries to convert each filter individually to see whether it's convertible, and then - // collect all convertible ones to build the final `SearchArgument`. - val convertibleFilters = for { - filter <- filters - _ <- buildSearchArgument(dataTypeMap, filter, newBuilder) - } yield filter - + val orcFilterConverter = new OrcFilterConverter(dataTypeMap) for { - // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- buildTree(convertibleFilters) + // Combines all filters using `And` to produce a single conjunction + conjunction <- buildTree(filters) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) + builder <- orcFilterConverter.buildSearchArgument(conjunction, newBuilder) } yield builder.build() } } +} - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder): Option[Builder] = { - createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) +private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) { + + def isSearchableType(dataType: DataType): Boolean = dataType match { + // Only the values in the Spark types below can be recognized by + // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method. + case ByteType | ShortType | FloatType | DoubleType => true + case IntegerType | LongType | StringType | BooleanType => true + case TimestampType | _: DecimalType => true + case _ => false } + private def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = { + val method = klass.getMethod(name, args: _*) + method.setAccessible(true) + method + } + + import org.apache.spark.sql.sources._ + /** - * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. - * @param builder the input SearchArgument.Builder. - * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed - * down safely. Pushing ONLY one side of AND down is safe to - * do at the top level or none of its ancestors is NOT and OR. - * @return the builder so far. + * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then + * only building the remaining convertible nodes. + * + * Doing the conversion in this way avoids the computational complexity problems introduced by + * checking whether a node is convertible while building it. The approach implemented here has + * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run + * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert + * the remaining nodes. + * + * The alternative approach of checking-while-building can (and did) result + * in exponential complexity in the height of the tree, causing perf problems with Filters with + * as few as ~35 nodes if they were skewed. */ - private def createBuilder( - dataTypeMap: Map[String, DataType], + private[sql] def buildSearchArgument( expression: Filter, - builder: Builder, - canPartialPushDownConjuncts: Boolean): Option[Builder] = { - def isSearchableType(dataType: DataType): Boolean = dataType match { - // Only the values in the Spark types below can be recognized by - // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method. - case ByteType | ShortType | FloatType | DoubleType => true - case IntegerType | LongType | StringType | BooleanType => true - case TimestampType | _: DecimalType => true - case _ => false + builder: Builder): Option[Builder] = { + trimUnconvertibleFilters(expression).map { filter => + updateBuilder(filter, builder) + builder } + } - expression match { - case And(left, right) => - // At here, it is not safe to just convert one side and remove the other side - // if we do not understand what the parent filters are. - // - // Here is an example used to explain the reason. - // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to - // convert b in ('1'). If we only convert a = 2, we will end up with a filter - // NOT(a = 2), which will generate wrong results. - // - // Pushing one side of AND down is only safe to do at the top level or in the child - // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate - // can be safely removed. - val leftBuilderOption = - createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) - val rightBuilderOption = - createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) - (leftBuilderOption, rightBuilderOption) match { - case (Some(_), Some(_)) => - for { - lhs <- createBuilder(dataTypeMap, left, - builder.startAnd(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) - } yield rhs.end() + /** + * Removes all sub-Filters from a given Filter that are not convertible to an ORC SearchArgument. + */ + private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { + performAction(TrimUnconvertibleFilters(canPartialPushDownConjuncts = true), expression) + } - case (Some(_), None) if canPartialPushDownConjuncts => - createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) + /** + * Builds a SearchArgument for the given Filter. This method should only be called on Filters + * that have previously been trimmed to remove unsupported sub-Filters! + */ + private def updateBuilder(expression: Filter, builder: Builder): Unit = + performAction(BuildSearchArgument(builder), expression) - case (None, Some(_)) if canPartialPushDownConjuncts => - createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts) + sealed trait ActionType[ReturnType] + case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) + extends ActionType[Option[Filter]] + case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] - case _ => None + // The performAction method can run both the filtering and building operations for a given + // node - we signify which one we want with the `actionType` parameter. + // + // There are a couple of benefits to coupling the two operations like this: + // 1. All the logic for a given predicate is grouped logically in the same place. You don't + // have to scroll across the whole file to see what the filter action for an And is while + // you're looking at the build action. + // 2. It's much easier to keep the implementations of the two operations up-to-date with + // each other. If the `filter` and `build` operations are implemented as separate case-matches + // in different methods, it's very easy to change one without appropriately updating the + // other. For example, if we add a new supported node type to `filter`, it would be very + // easy to forget to update `build` to support it too, thus leading to conversion errors. + private def performAction[ReturnType]( + actionType: ActionType[ReturnType], + expression: Filter): ReturnType = { + + expression match { + case And(left, right) => + actionType match { + case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) => + // At here, it is not safe to just keep one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. + // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to + // convert b in ('1'). If we only convert a = 2, we will end up with a filter + // NOT(a = 2), which will generate wrong results. + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported + // predicate can be safely removed. + val lhs = performAction(t, left) + val rhs = performAction(t, right) + (lhs, rhs) match { + case (Some(l), Some(r)) => Some(And(l, r)) + case (Some(_), None) if canPartialPushDownConjuncts => lhs + case (None, Some(_)) if canPartialPushDownConjuncts => rhs + case _ => None + } + case b @ BuildSearchArgument(builder) => + builder.startAnd() + performAction(b, left) + performAction(b, right) + builder.end() + () } case Or(left, right) => - // The Or predicate is convertible when both of its children can be pushed down. - // That is to say, if one/both of the children can be partially pushed down, the Or - // predicate can be partially pushed down as well. - // - // Here is an example used to explain the reason. - // Let's say we have - // (a1 AND a2) OR (b1 AND b2), - // a1 and b1 is convertible, while a2 and b2 is not. - // The predicate can be converted as - // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) - // As per the logical in And predicate, we can push down (a1 OR b1). - for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) - lhs <- createBuilder(dataTypeMap, left, - builder.startOr(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) - } yield rhs.end() + actionType match { + case t: TrimUnconvertibleFilters => + // The Or predicate is convertible when both of its children can be pushed down. + // That is to say, if one/both of the children can be partially pushed down, the Or + // predicate can be partially pushed down as well. + // + // Here is an example used to explain the reason. + // Let's say we have + // (a1 AND a2) OR (b1 AND b2), + // a1 and b1 is convertible, while a2 and b2 is not. + // The predicate can be converted as + // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) + // As per the logical in And predicate, we can push down (a1 OR b1). + for { + lhs: Filter <- performAction(t, left) + rhs: Filter <- performAction(t, right) + } yield Or(lhs, rhs) + case b @ BuildSearchArgument(builder) => + builder.startOr() + performAction(b, left) + performAction(b, right) + builder.end() + () + } case Not(child) => - for { - _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) - negate <- createBuilder(dataTypeMap, - child, builder.startNot(), canPartialPushDownConjuncts = false) - } yield negate.end() + actionType match { + case t: TrimUnconvertibleFilters => + performAction(t.copy(canPartialPushDownConjuncts = false), child).map(Not) + case b @ BuildSearchArgument(builder) => + builder.startNot() + performAction(b, child) + builder.end() + () + } - // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` - // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be - // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). + // NOTE: For all case branches dealing with leaf predicates below, the additional + // `startAnd()` call is mandatory. ORC `SearchArgument` builder requires that all leaf + // predicates must be wrapped by a "parent" predicate (`And`, `Or`, or `Not`). case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "equals", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "equals", classOf[String], classOf[Object]) + method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() + () + } case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "nullSafeEquals", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "nullSafeEquals", classOf[String], classOf[Object]) + method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() + () + } case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) + method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() + () + } case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) + method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() + () + } case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startNot() - val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startNot() + val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) + method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() + () + } case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startNot() - val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startNot() + val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) + method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() + () + } case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "isNull", classOf[String]) - Some(method.invoke(bd, attribute).asInstanceOf[Builder].end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "isNull", classOf[String]) + method.invoke(bd, attribute).asInstanceOf[Builder].end() + () + } case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startNot() - val method = findMethod(bd.getClass, "isNull", classOf[String]) - Some(method.invoke(bd, attribute).asInstanceOf[Builder].end()) - + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startNot() + val method = findMethod(bd.getClass, "isNull", classOf[String]) + method.invoke(bd, attribute).asInstanceOf[Builder].end() + () + } case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "in", classOf[String], classOf[Array[Object]]) - Some(method.invoke(bd, attribute, values.map(_.asInstanceOf[AnyRef])) - .asInstanceOf[Builder].end()) + actionType match { + case _: TrimUnconvertibleFilters => Some(expression) + case BuildSearchArgument(builder) => + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "in", classOf[String], classOf[Array[Object]]) + method.invoke(bd, attribute, values.map(_.asInstanceOf[AnyRef])) + .asInstanceOf[Builder].end() + () + } - case _ => None + case _ => + actionType match { + case _: TrimUnconvertibleFilters => None + case BuildSearchArgument(builder) => + throw new IllegalArgumentException(s"Can't build unsupported filter ${expression}") + } } } }