From 1f641d8184e2e8d2b355f93be56fdba0aa3e3aaa Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 19 Jun 2019 21:21:13 +0800 Subject: [PATCH 1/6] refactor save --- .../datasources/orc/OrcFilters.scala | 329 +++++++--------- .../datasources/orc/OrcFilters.scala | 329 +++++++--------- .../spark/sql/hive/orc/OrcFilters.scala | 357 +++++++----------- 3 files changed, 394 insertions(+), 621 deletions(-) diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index e87f7d8caf4f8..f9288d4f9bad2 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -64,12 +64,13 @@ private[sql] object OrcFilters extends OrcFiltersBase { */ def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap - val orcFilterConverter = new OrcFilterConverter(dataTypeMap) for { - // Combines all filters using `And` to produce a single conjunction - conjunction <- buildTree(filters) - // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- orcFilterConverter.buildSearchArgument(conjunction, newBuilder) + // Combines all convertible filters using `And` to produce a single conjunction + conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters)) + // Then tries to build a single ORC `SearchArgument` for the conjunction predicate. + // The input predicate is fully convertible. There should not be any empty result in the + // following recursive method call `buildSearchArgument`. + builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) } yield builder.build() } @@ -77,13 +78,65 @@ private[sql] object OrcFilters extends OrcFiltersBase { schema: StructType, dataTypeMap: Map[String, DataType], filters: Seq[Filter]): Seq[Filter] = { - val orcFilterConverter = new OrcFilterConverter(dataTypeMap) - filters.flatMap(orcFilterConverter.trimUnconvertibleFilters) - } + import org.apache.spark.sql.sources._ -} + def convertibleFiltersHelper( + filter: Filter, + canPartialPushDown: Boolean): Option[Filter] = filter match { + // At here, it is not safe to just convert one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. + // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to + // convert b in ('1'). If we only convert a = 2, we will end up with a filter + // NOT(a = 2), which will generate wrong results. + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate + // can be safely removed. + case And(left, right) => + val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) + val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) + (leftResultOptional, rightResultOptional) match { + case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult)) + case (Some(leftResult), None) if canPartialPushDown => Some(leftResult) + case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult) + case _ => None + } -private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) { + // The Or predicate is convertible when both of its children can be pushed down. + // That is to say, if one/both of the children can be partially pushed down, the Or + // predicate can be partially pushed down as well. + // + // Here is an example used to explain the reason. + // Let's say we have + // (a1 AND a2) OR (b1 AND b2), + // a1 and b1 is convertible, while a2 and b2 is not. + // The predicate can be converted as + // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) + // As per the logical in And predicate, we can push down (a1 OR b1). + case Or(left, right) => + val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) + val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) + if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) { + None + } else { + Some(Or(leftResultOptional.get, rightResultOptional.get)) + } + case Not(pred) => + val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) + resultOptional.map(Not) + case other => + if (buildSearchArgument(dataTypeMap, other, newBuilder()).isDefined) { + Some(other) + } else { + None + } + } + filters.flatMap { filter => + convertibleFiltersHelper(filter, true) + } + } /** * Get PredicateLeafType which is corresponding to the given DataType. @@ -115,228 +168,98 @@ private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) { case _ => value } - import org.apache.spark.sql.sources._ - import OrcFilters._ - /** - * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then - * only building the remaining convertible nodes. - * - * Doing the conversion in this way avoids the computational complexity problems introduced by - * checking whether a node is convertible while building it. The approach implemented here has - * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run - * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert - * the remaining nodes. - * - * The alternative approach of checking-while-building can (and did) result - * in exponential complexity in the height of the tree, causing perf problems with Filters with - * as few as ~35 nodes if they were skewed. + * Build a SearchArgument and return the builder so far. */ - private[sql] def buildSearchArgument( + private def buildSearchArgument( + dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { - trimUnconvertibleFilters(expression).map { filter => - updateBuilder(filter, builder) - builder - } + createBuilder(dataTypeMap, expression, builder) } /** - * Removes all sub-Filters from a given Filter that are not convertible to an ORC SearchArgument. + * @param dataTypeMap a map from the attribute name to its data type. + * @param expression the input filter predicates. + * @param builder the input SearchArgument.Builder. + * @return the builder so far. */ - private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { - performAction(TrimUnconvertibleFilters(canPartialPushDownConjuncts = true), expression) - } - - /** - * Builds a SearchArgument for the given Filter. This method should only be called on Filters - * that have previously been trimmed to remove unsupported sub-Filters! - */ - private def updateBuilder(expression: Filter, builder: Builder): Unit = - performAction(BuildSearchArgument(builder), expression) - - sealed trait ActionType[ReturnType] - case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) - extends ActionType[Option[Filter]] - case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] - - // The performAction method can run both the filtering and building operations for a given - // node - we signify which one we want with the `actionType` parameter. - // - // There are a couple of benefits to coupling the two operations like this: - // 1. All the logic for a given predicate is grouped logically in the same place. You don't - // have to scroll across the whole file to see what the filter action for an And is while - // you're looking at the build action. - // 2. It's much easier to keep the implementations of the two operations up-to-date with - // each other. If the `filter` and `build` operations are implemented as separate case-matches - // in different methods, it's very easy to change one without appropriately updating the - // other. For example, if we add a new supported node type to `filter`, it would be very - // easy to forget to update `build` to support it too, thus leading to conversion errors. - private def performAction[ReturnType]( - actionType: ActionType[ReturnType], - expression: Filter): ReturnType = { + private def createBuilder( + dataTypeMap: Map[String, DataType], + expression: Filter, + builder: Builder): Option[Builder] = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) + import org.apache.spark.sql.sources._ + expression match { case And(left, right) => - actionType match { - case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) => - // At here, it is not safe to just keep one side and remove the other side - // if we do not understand what the parent filters are. - // - // Here is an example used to explain the reason. - // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to - // convert b in ('1'). If we only convert a = 2, we will end up with a filter - // NOT(a = 2), which will generate wrong results. - // - // Pushing one side of AND down is only safe to do at the top level or in the child - // AND before hitting NOT or OR conditions, and in this case, the unsupported - // predicate can be safely removed. - val lhs = performAction(t, left) - val rhs = performAction(t, right) - (lhs, rhs) match { - case (Some(l), Some(r)) => Some(And(l, r)) - case (Some(_), None) if canPartialPushDownConjuncts => lhs - case (None, Some(_)) if canPartialPushDownConjuncts => rhs - case _ => None - } - case b @ BuildSearchArgument(builder) => - builder.startAnd() - performAction(b, left) - performAction(b, right) - builder.end() - () - } + for { + lhs <- createBuilder(dataTypeMap, left, builder.startAnd()) + rhs <- createBuilder(dataTypeMap, right, lhs) + } yield rhs.end() case Or(left, right) => - actionType match { - case t: TrimUnconvertibleFilters => - // The Or predicate is convertible when both of its children can be pushed down. - // That is to say, if one/both of the children can be partially pushed down, the Or - // predicate can be partially pushed down as well. - // - // Here is an example used to explain the reason. - // Let's say we have - // (a1 AND a2) OR (b1 AND b2), - // a1 and b1 is convertible, while a2 and b2 is not. - // The predicate can be converted as - // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) - // As per the logical in And predicate, we can push down (a1 OR b1). - for { - lhs: Filter <- performAction(t, left) - rhs: Filter <- performAction(t, right) - } yield Or(lhs, rhs) - case b @ BuildSearchArgument(builder) => - builder.startOr() - performAction(b, left) - performAction(b, right) - builder.end() - () - } + for { + lhs <- createBuilder(dataTypeMap, left, builder.startOr()) + rhs <- createBuilder(dataTypeMap, right, lhs) + } yield rhs.end() case Not(child) => - actionType match { - case t: TrimUnconvertibleFilters => - performAction(t.copy(canPartialPushDownConjuncts = false), child).map(Not) - case b @ BuildSearchArgument(builder) => - builder.startNot() - performAction(b, child) - builder.end() - () - } + for { + negate <- createBuilder(dataTypeMap, child, builder.startNot()) + } yield negate.end() - // NOTE: For all case branches dealing with leaf predicates below, the additional - // `startAnd()` call is mandatory. ORC `SearchArgument` builder requires that all leaf - // predicates must be wrapped by a "parent" predicate (`And`, `Or`, or `Not`). + // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` + // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be + // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startAnd().equals(quotedName, getType(attribute), castedValue).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + Some(builder.startAnd().equals(quotedName, getType(attribute), castedValue).end()) + case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end()) + case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + Some(builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end()) + case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end()) + case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end()) + case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + Some(builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end()) + case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - builder.startAnd().isNull(quotedName, getType(attribute)).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + Some(builder.startAnd().isNull(quotedName, getType(attribute)).end()) + case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - builder.startNot().isNull(quotedName, getType(attribute)).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + Some(builder.startNot().isNull(quotedName, getType(attribute)).end()) + case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) - builder.startAnd().in(quotedName, getType(attribute), - castedValues.map(_.asInstanceOf[AnyRef]): _*).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) + Some(builder.startAnd().in(quotedName, getType(attribute), + castedValues.map(_.asInstanceOf[AnyRef]): _*).end()) - case _ => - actionType match { - case _: TrimUnconvertibleFilters => None - case BuildSearchArgument(builder) => - throw new IllegalArgumentException(s"Can't build unsupported filter ${expression}") - } + case _ => None } } } diff --git a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index ebcd3567d34cf..7e652c13210bc 100644 --- a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -64,12 +64,13 @@ private[sql] object OrcFilters extends OrcFiltersBase { */ def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap - val orcFilterConverter = new OrcFilterConverter(dataTypeMap) for { - // Combines all filters using `And` to produce a single conjunction - conjunction <- buildTree(filters) - // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- orcFilterConverter.buildSearchArgument(conjunction, newBuilder) + // Combines all convertible filters using `And` to produce a single conjunction + conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters)) + // Then tries to build a single ORC `SearchArgument` for the conjunction predicate. + // The input predicate is fully convertible. There should not be any empty result in the + // following recursive method call `buildSearchArgument`. + builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) } yield builder.build() } @@ -77,13 +78,65 @@ private[sql] object OrcFilters extends OrcFiltersBase { schema: StructType, dataTypeMap: Map[String, DataType], filters: Seq[Filter]): Seq[Filter] = { - val orcFilterConverter = new OrcFilterConverter(dataTypeMap) - filters.flatMap(orcFilterConverter.trimUnconvertibleFilters) - } + import org.apache.spark.sql.sources._ -} + def convertibleFiltersHelper( + filter: Filter, + canPartialPushDown: Boolean): Option[Filter] = filter match { + // At here, it is not safe to just convert one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. + // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to + // convert b in ('1'). If we only convert a = 2, we will end up with a filter + // NOT(a = 2), which will generate wrong results. + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate + // can be safely removed. + case And(left, right) => + val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) + val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) + (leftResultOptional, rightResultOptional) match { + case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult)) + case (Some(leftResult), None) if canPartialPushDown => Some(leftResult) + case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult) + case _ => None + } -private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) { + // The Or predicate is convertible when both of its children can be pushed down. + // That is to say, if one/both of the children can be partially pushed down, the Or + // predicate can be partially pushed down as well. + // + // Here is an example used to explain the reason. + // Let's say we have + // (a1 AND a2) OR (b1 AND b2), + // a1 and b1 is convertible, while a2 and b2 is not. + // The predicate can be converted as + // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) + // As per the logical in And predicate, we can push down (a1 OR b1). + case Or(left, right) => + val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) + val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) + if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) { + None + } else { + Some(Or(leftResultOptional.get, rightResultOptional.get)) + } + case Not(pred) => + val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) + resultOptional.map(Not) + case other => + if (buildSearchArgument(dataTypeMap, other, newBuilder()).isDefined) { + Some(other) + } else { + None + } + } + filters.flatMap { filter => + convertibleFiltersHelper(filter, true) + } + } /** * Get PredicateLeafType which is corresponding to the given DataType. @@ -115,228 +168,98 @@ private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) { case _ => value } - import org.apache.spark.sql.sources._ - import OrcFilters._ - /** - * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then - * only building the remaining convertible nodes. - * - * Doing the conversion in this way avoids the computational complexity problems introduced by - * checking whether a node is convertible while building it. The approach implemented here has - * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run - * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert - * the remaining nodes. - * - * The alternative approach of checking-while-building can (and did) result - * in exponential complexity in the height of the tree, causing perf problems with Filters with - * as few as ~35 nodes if they were skewed. + * Build a SearchArgument and return the builder so far. */ - private[sql] def buildSearchArgument( + private def buildSearchArgument( + dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { - trimUnconvertibleFilters(expression).map { filter => - updateBuilder(filter, builder) - builder - } + createBuilder(dataTypeMap, expression, builder) } /** - * Removes all sub-Filters from a given Filter that are not convertible to an ORC SearchArgument. + * @param dataTypeMap a map from the attribute name to its data type. + * @param expression the input filter predicates. + * @param builder the input SearchArgument.Builder. + * @return the builder so far. */ - private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { - performAction(TrimUnconvertibleFilters(canPartialPushDownConjuncts = true), expression) - } - - /** - * Builds a SearchArgument for the given Filter. This method should only be called on Filters - * that have previously been trimmed to remove unsupported sub-Filters! - */ - private def updateBuilder(expression: Filter, builder: Builder): Unit = - performAction(BuildSearchArgument(builder), expression) - - sealed trait ActionType[ReturnType] - case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) - extends ActionType[Option[Filter]] - case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] - - // The performAction method can run both the filtering and building operations for a given - // node - we signify which one we want with the `actionType` parameter. - // - // There are a couple of benefits to coupling the two operations like this: - // 1. All the logic for a given predicate is grouped logically in the same place. You don't - // have to scroll across the whole file to see what the filter action for an And is while - // you're looking at the build action. - // 2. It's much easier to keep the implementations of the two operations up-to-date with - // each other. If the `filter` and `build` operations are implemented as separate case-matches - // in different methods, it's very easy to change one without appropriately updating the - // other. For example, if we add a new supported node type to `filter`, it would be very - // easy to forget to update `build` to support it too, thus leading to conversion errors. - private def performAction[ReturnType]( - actionType: ActionType[ReturnType], - expression: Filter): ReturnType = { + private def createBuilder( + dataTypeMap: Map[String, DataType], + expression: Filter, + builder: Builder): Option[Builder] = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) + import org.apache.spark.sql.sources._ + expression match { case And(left, right) => - actionType match { - case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) => - // At here, it is not safe to just keep one side and remove the other side - // if we do not understand what the parent filters are. - // - // Here is an example used to explain the reason. - // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to - // convert b in ('1'). If we only convert a = 2, we will end up with a filter - // NOT(a = 2), which will generate wrong results. - // - // Pushing one side of AND down is only safe to do at the top level or in the child - // AND before hitting NOT or OR conditions, and in this case, the unsupported - // predicate can be safely removed. - val lhs = performAction(t, left) - val rhs = performAction(t, right) - (lhs, rhs) match { - case (Some(l), Some(r)) => Some(And(l, r)) - case (Some(_), None) if canPartialPushDownConjuncts => lhs - case (None, Some(_)) if canPartialPushDownConjuncts => rhs - case _ => None - } - case b @ BuildSearchArgument(builder) => - builder.startAnd() - performAction(b, left) - performAction(b, right) - builder.end() - () - } + for { + lhs <- createBuilder(dataTypeMap, left, builder.startAnd()) + rhs <- createBuilder(dataTypeMap, right, lhs) + } yield rhs.end() case Or(left, right) => - actionType match { - case t: TrimUnconvertibleFilters => - // The Or predicate is convertible when both of its children can be pushed down. - // That is to say, if one/both of the children can be partially pushed down, the Or - // predicate can be partially pushed down as well. - // - // Here is an example used to explain the reason. - // Let's say we have - // (a1 AND a2) OR (b1 AND b2), - // a1 and b1 is convertible, while a2 and b2 is not. - // The predicate can be converted as - // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) - // As per the logical in And predicate, we can push down (a1 OR b1). - for { - lhs: Filter <- performAction(t, left) - rhs: Filter <- performAction(t, right) - } yield Or(lhs, rhs) - case b @ BuildSearchArgument(builder) => - builder.startOr() - performAction(b, left) - performAction(b, right) - builder.end() - () - } + for { + lhs <- createBuilder(dataTypeMap, left, builder.startOr()) + rhs <- createBuilder(dataTypeMap, right, lhs) + } yield rhs.end() case Not(child) => - actionType match { - case t: TrimUnconvertibleFilters => - performAction(t.copy(canPartialPushDownConjuncts = false), child).map(Not) - case b @ BuildSearchArgument(builder) => - builder.startNot() - performAction(b, child) - builder.end() - () - } + for { + negate <- createBuilder(dataTypeMap, child, builder.startNot()) + } yield negate.end() - // NOTE: For all case branches dealing with leaf predicates below, the additional - // `startAnd()` call is mandatory. ORC `SearchArgument` builder requires that all leaf - // predicates must be wrapped by a "parent" predicate (`And`, `Or`, or `Not`). + // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` + // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be + // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startAnd().equals(quotedName, getType(attribute), castedValue).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + Some(builder.startAnd().equals(quotedName, getType(attribute), castedValue).end()) + case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end()) + case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + Some(builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end()) + case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end()) + case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end()) + case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValue = castLiteralValue(value, dataTypeMap(attribute)) + Some(builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end()) + case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - builder.startAnd().isNull(quotedName, getType(attribute)).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + Some(builder.startAnd().isNull(quotedName, getType(attribute)).end()) + case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - builder.startNot().isNull(quotedName, getType(attribute)).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + Some(builder.startNot().isNull(quotedName, getType(attribute)).end()) + case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val quotedName = quoteAttributeNameIfNeeded(attribute) - val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) - builder.startAnd().in(quotedName, getType(attribute), - castedValues.map(_.asInstanceOf[AnyRef]): _*).end() - () - } + val quotedName = quoteAttributeNameIfNeeded(attribute) + val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) + Some(builder.startAnd().in(quotedName, getType(attribute), + castedValues.map(_.asInstanceOf[AnyRef]): _*).end()) - case _ => - actionType match { - case _: TrimUnconvertibleFilters => None - case BuildSearchArgument(builder) => - throw new IllegalArgumentException(s"Can't build unsupported filter ${expression}") - } + case _ => None } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index cee8fdbe43a11..f0aebb44478b4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -62,260 +62,187 @@ import org.apache.spark.sql.types._ */ private[orc] object OrcFilters extends Logging { + private def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = { + val method = klass.getMethod(name, args: _*) + method.setAccessible(true) + method + } + def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = { if (HiveUtils.isHive23) { DatasourceOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]] } else { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap - val orcFilterConverter = new OrcFilterConverter(dataTypeMap) for { - // Combines all filters using `And` to produce a single conjunction - conjunction <- buildTree(filters) - // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- orcFilterConverter.buildSearchArgument(conjunction, newBuilder) + // Combines all convertible filters using `And` to produce a single conjunction + conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters)) + // Then tries to build a single ORC `SearchArgument` for the conjunction predicate. + // The input predicate is fully convertible. There should not be any empty result in the + // following recursive method call `buildSearchArgument`. + builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) } yield builder.build() } } -} -private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) { + def convertibleFilters( + schema: StructType, + dataTypeMap: Map[String, DataType], + filters: Seq[Filter]): Seq[Filter] = { + import org.apache.spark.sql.sources._ + + def convertibleFiltersHelper( + filter: Filter, + canPartialPushDown: Boolean): Option[Filter] = filter match { + // At here, it is not safe to just convert one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. + // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to + // convert b in ('1'). If we only convert a = 2, we will end up with a filter + // NOT(a = 2), which will generate wrong results. + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate + // can be safely removed. + case And(left, right) => + val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) + val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) + (leftResultOptional, rightResultOptional) match { + case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult)) + case (Some(leftResult), None) if canPartialPushDown => Some(leftResult) + case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult) + case _ => None + } - def isSearchableType(dataType: DataType): Boolean = dataType match { - // Only the values in the Spark types below can be recognized by - // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method. - case ByteType | ShortType | FloatType | DoubleType => true - case IntegerType | LongType | StringType | BooleanType => true - case TimestampType | _: DecimalType => true - case _ => false + // The Or predicate is convertible when both of its children can be pushed down. + // That is to say, if one/both of the children can be partially pushed down, the Or + // predicate can be partially pushed down as well. + // + // Here is an example used to explain the reason. + // Let's say we have + // (a1 AND a2) OR (b1 AND b2), + // a1 and b1 is convertible, while a2 and b2 is not. + // The predicate can be converted as + // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) + // As per the logical in And predicate, we can push down (a1 OR b1). + case Or(left, right) => + val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) + val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) + if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) { + None + } else { + Some(Or(leftResultOptional.get, rightResultOptional.get)) + } + case Not(pred) => + val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) + resultOptional.map(Not) + case other => + if (buildSearchArgument(dataTypeMap, other, newBuilder()).isDefined) { + Some(other) + } else { + None + } + } + filters.flatMap { filter => + convertibleFiltersHelper(filter, true) + } } - private def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = { - val method = klass.getMethod(name, args: _*) - method.setAccessible(true) - method + private def buildSearchArgument( + dataTypeMap: Map[String, DataType], + expression: Filter, + builder: Builder): Option[Builder] = { + createBuilder(dataTypeMap, expression, builder) } - import org.apache.spark.sql.sources._ - /** - * Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then - * only building the remaining convertible nodes. - * - * Doing the conversion in this way avoids the computational complexity problems introduced by - * checking whether a node is convertible while building it. The approach implemented here has - * complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run - * a single pass over the tree to trim it, and then another pass on the trimmed tree to convert - * the remaining nodes. - * - * The alternative approach of checking-while-building can (and did) result - * in exponential complexity in the height of the tree, causing perf problems with Filters with - * as few as ~35 nodes if they were skewed. + * @param dataTypeMap a map from the attribute name to its data type. + * @param expression the input filter predicates. + * @param builder the input SearchArgument.Builder. + * @return the builder so far. */ - private[sql] def buildSearchArgument( + private def createBuilder( + dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { - trimUnconvertibleFilters(expression).map { filter => - updateBuilder(filter, builder) - builder + def isSearchableType(dataType: DataType): Boolean = dataType match { + // Only the values in the Spark types below can be recognized by + // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method. + case ByteType | ShortType | FloatType | DoubleType => true + case IntegerType | LongType | StringType | BooleanType => true + case TimestampType | _: DecimalType => true + case _ => false } - } - - /** - * Removes all sub-Filters from a given Filter that are not convertible to an ORC SearchArgument. - */ - private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = { - performAction(TrimUnconvertibleFilters(canPartialPushDownConjuncts = true), expression) - } - - /** - * Builds a SearchArgument for the given Filter. This method should only be called on Filters - * that have previously been trimmed to remove unsupported sub-Filters! - */ - private def updateBuilder(expression: Filter, builder: Builder): Unit = - performAction(BuildSearchArgument(builder), expression) - - sealed trait ActionType[ReturnType] - case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean) - extends ActionType[Option[Filter]] - case class BuildSearchArgument(builder: Builder) extends ActionType[Unit] - - // The performAction method can run both the filtering and building operations for a given - // node - we signify which one we want with the `actionType` parameter. - // - // There are a couple of benefits to coupling the two operations like this: - // 1. All the logic for a given predicate is grouped logically in the same place. You don't - // have to scroll across the whole file to see what the filter action for an And is while - // you're looking at the build action. - // 2. It's much easier to keep the implementations of the two operations up-to-date with - // each other. If the `filter` and `build` operations are implemented as separate case-matches - // in different methods, it's very easy to change one without appropriately updating the - // other. For example, if we add a new supported node type to `filter`, it would be very - // easy to forget to update `build` to support it too, thus leading to conversion errors. - private def performAction[ReturnType]( - actionType: ActionType[ReturnType], - expression: Filter): ReturnType = { expression match { case And(left, right) => - actionType match { - case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) => - // At here, it is not safe to just keep one side and remove the other side - // if we do not understand what the parent filters are. - // - // Here is an example used to explain the reason. - // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to - // convert b in ('1'). If we only convert a = 2, we will end up with a filter - // NOT(a = 2), which will generate wrong results. - // - // Pushing one side of AND down is only safe to do at the top level or in the child - // AND before hitting NOT or OR conditions, and in this case, the unsupported - // predicate can be safely removed. - val lhs = performAction(t, left) - val rhs = performAction(t, right) - (lhs, rhs) match { - case (Some(l), Some(r)) => Some(And(l, r)) - case (Some(_), None) if canPartialPushDownConjuncts => lhs - case (None, Some(_)) if canPartialPushDownConjuncts => rhs - case _ => None - } - case b @ BuildSearchArgument(builder) => - builder.startAnd() - performAction(b, left) - performAction(b, right) - builder.end() - () - } + for { + lhs <- createBuilder(dataTypeMap, left, builder.startAnd()) + rhs <- createBuilder(dataTypeMap, right, lhs) + } yield rhs.end() case Or(left, right) => - actionType match { - case t: TrimUnconvertibleFilters => - // The Or predicate is convertible when both of its children can be pushed down. - // That is to say, if one/both of the children can be partially pushed down, the Or - // predicate can be partially pushed down as well. - // - // Here is an example used to explain the reason. - // Let's say we have - // (a1 AND a2) OR (b1 AND b2), - // a1 and b1 is convertible, while a2 and b2 is not. - // The predicate can be converted as - // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) - // As per the logical in And predicate, we can push down (a1 OR b1). - for { - lhs: Filter <- performAction(t, left) - rhs: Filter <- performAction(t, right) - } yield Or(lhs, rhs) - case b @ BuildSearchArgument(builder) => - builder.startOr() - performAction(b, left) - performAction(b, right) - builder.end() - () - } + for { + lhs <- createBuilder(dataTypeMap, left, builder.startOr()) + rhs <- createBuilder(dataTypeMap, right, lhs) + } yield rhs.end() case Not(child) => - actionType match { - case t: TrimUnconvertibleFilters => - performAction(t.copy(canPartialPushDownConjuncts = false), child).map(Not) - case b @ BuildSearchArgument(builder) => - builder.startNot() - performAction(b, child) - builder.end() - () - } + for { + negate <- createBuilder(dataTypeMap, child, builder.startNot()) + } yield negate.end() - // NOTE: For all case branches dealing with leaf predicates below, the additional - // `startAnd()` call is mandatory. ORC `SearchArgument` builder requires that all leaf - // predicates must be wrapped by a "parent" predicate (`And`, `Or`, or `Not`). + // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` + // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be + // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "equals", classOf[String], classOf[Object]) - method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() - () - } + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "equals", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) + case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "nullSafeEquals", classOf[String], classOf[Object]) - method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() - () - } + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "nullSafeEquals", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) + case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) - method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() - () - } + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) + case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) - method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() - () - } + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) + case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val bd = builder.startNot() - val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) - method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() - () - } + val bd = builder.startNot() + val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) + case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val bd = builder.startNot() - val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) - method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end() - () - } + val bd = builder.startNot() + val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) + case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "isNull", classOf[String]) - method.invoke(bd, attribute).asInstanceOf[Builder].end() - () - } + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "isNull", classOf[String]) + Some(method.invoke(bd, attribute).asInstanceOf[Builder].end()) + case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val bd = builder.startNot() - val method = findMethod(bd.getClass, "isNull", classOf[String]) - method.invoke(bd, attribute).asInstanceOf[Builder].end() - () - } + val bd = builder.startNot() + val method = findMethod(bd.getClass, "isNull", classOf[String]) + Some(method.invoke(bd, attribute).asInstanceOf[Builder].end()) + case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => - actionType match { - case _: TrimUnconvertibleFilters => Some(expression) - case BuildSearchArgument(builder) => - val bd = builder.startAnd() - val method = findMethod(bd.getClass, "in", classOf[String], classOf[Array[Object]]) - method.invoke(bd, attribute, values.map(_.asInstanceOf[AnyRef])) - .asInstanceOf[Builder].end() - () - } + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "in", classOf[String], classOf[Array[Object]]) + Some(method.invoke(bd, attribute, values.map(_.asInstanceOf[AnyRef])) + .asInstanceOf[Builder].end()) - case _ => - actionType match { - case _: TrimUnconvertibleFilters => None - case BuildSearchArgument(builder) => - throw new IllegalArgumentException(s"Can't build unsupported filter ${expression}") - } + case _ => None } } } From c0cd6f3876766624948f0b0cc16793e3bced2754 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 19 Jun 2019 22:47:27 +0800 Subject: [PATCH 2/6] rename --- .../datasources/orc/OrcFilters.scala | 22 ++++++------------- .../datasources/orc/OrcFilters.scala | 22 ++++++------------- .../spark/sql/hive/orc/OrcFilters.scala | 19 +++++----------- 3 files changed, 20 insertions(+), 43 deletions(-) diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index f9288d4f9bad2..17eb93de643c0 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -170,21 +170,13 @@ private[sql] object OrcFilters extends OrcFiltersBase { /** * Build a SearchArgument and return the builder so far. - */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder): Option[Builder] = { - createBuilder(dataTypeMap, expression, builder) - } - - /** + * * @param dataTypeMap a map from the attribute name to its data type. * @param expression the input filter predicates. * @param builder the input SearchArgument.Builder. * @return the builder so far. */ - private def createBuilder( + private def buildSearchArgument( dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { @@ -196,19 +188,19 @@ private[sql] object OrcFilters extends OrcFiltersBase { expression match { case And(left, right) => for { - lhs <- createBuilder(dataTypeMap, left, builder.startAnd()) - rhs <- createBuilder(dataTypeMap, right, lhs) + lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd()) + rhs <- buildSearchArgument(dataTypeMap, right, lhs) } yield rhs.end() case Or(left, right) => for { - lhs <- createBuilder(dataTypeMap, left, builder.startOr()) - rhs <- createBuilder(dataTypeMap, right, lhs) + lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr()) + rhs <- buildSearchArgument(dataTypeMap, right, lhs) } yield rhs.end() case Not(child) => for { - negate <- createBuilder(dataTypeMap, child, builder.startNot()) + negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) } yield negate.end() // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` diff --git a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 7e652c13210bc..090433f1d7d50 100644 --- a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -170,21 +170,13 @@ private[sql] object OrcFilters extends OrcFiltersBase { /** * Build a SearchArgument and return the builder so far. - */ - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder): Option[Builder] = { - createBuilder(dataTypeMap, expression, builder) - } - - /** + * * @param dataTypeMap a map from the attribute name to its data type. * @param expression the input filter predicates. * @param builder the input SearchArgument.Builder. * @return the builder so far. */ - private def createBuilder( + private def buildSearchArgument( dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { @@ -196,19 +188,19 @@ private[sql] object OrcFilters extends OrcFiltersBase { expression match { case And(left, right) => for { - lhs <- createBuilder(dataTypeMap, left, builder.startAnd()) - rhs <- createBuilder(dataTypeMap, right, lhs) + lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd()) + rhs <- buildSearchArgument(dataTypeMap, right, lhs) } yield rhs.end() case Or(left, right) => for { - lhs <- createBuilder(dataTypeMap, left, builder.startOr()) - rhs <- createBuilder(dataTypeMap, right, lhs) + lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr()) + rhs <- buildSearchArgument(dataTypeMap, right, lhs) } yield rhs.end() case Not(child) => for { - negate <- createBuilder(dataTypeMap, child, builder.startNot()) + negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) } yield negate.end() // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index f0aebb44478b4..33ebb5d62e8aa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -148,20 +148,13 @@ private[orc] object OrcFilters extends Logging { } } - private def buildSearchArgument( - dataTypeMap: Map[String, DataType], - expression: Filter, - builder: Builder): Option[Builder] = { - createBuilder(dataTypeMap, expression, builder) - } - /** * @param dataTypeMap a map from the attribute name to its data type. * @param expression the input filter predicates. * @param builder the input SearchArgument.Builder. * @return the builder so far. */ - private def createBuilder( + private def buildSearchArgument( dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { @@ -177,19 +170,19 @@ private[orc] object OrcFilters extends Logging { expression match { case And(left, right) => for { - lhs <- createBuilder(dataTypeMap, left, builder.startAnd()) - rhs <- createBuilder(dataTypeMap, right, lhs) + lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd()) + rhs <- buildSearchArgument(dataTypeMap, right, lhs) } yield rhs.end() case Or(left, right) => for { - lhs <- createBuilder(dataTypeMap, left, builder.startOr()) - rhs <- createBuilder(dataTypeMap, right, lhs) + lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr()) + rhs <- buildSearchArgument(dataTypeMap, right, lhs) } yield rhs.end() case Not(child) => for { - negate <- createBuilder(dataTypeMap, child, builder.startNot()) + negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) } yield negate.end() // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` From ce08f04a2bbf29b7b39c5e42e827b7e7591778db Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 20 Jun 2019 02:21:38 +0800 Subject: [PATCH 3/6] address comments --- .../datasources/orc/OrcFilters.scala | 21 +++++++------------ .../datasources/orc/OrcFilters.scala | 21 +++++++------------ .../spark/sql/hive/orc/OrcFilters.scala | 21 +++++++------------ 3 files changed, 21 insertions(+), 42 deletions(-) diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 17eb93de643c0..f8eae6f640ad7 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -116,22 +116,15 @@ private[sql] object OrcFilters extends OrcFiltersBase { // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) // As per the logical in And predicate, we can push down (a1 OR b1). case Or(left, right) => - val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) - val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) - if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) { - None - } else { - Some(Or(leftResultOptional.get, rightResultOptional.get)) - } + for { + lhs <- convertibleFiltersHelper(left, canPartialPushDown) + rhs <- convertibleFiltersHelper(right, canPartialPushDown) + } yield Or(lhs, rhs) case Not(pred) => - val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) - resultOptional.map(Not) + val childResultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) + childResultOptional.map(Not) case other => - if (buildSearchArgument(dataTypeMap, other, newBuilder()).isDefined) { - Some(other) - } else { - None - } + for (_ <- buildSearchArgument(dataTypeMap, other, newBuilder())) yield other } filters.flatMap { filter => convertibleFiltersHelper(filter, true) diff --git a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 090433f1d7d50..3adc2367fd452 100644 --- a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -116,22 +116,15 @@ private[sql] object OrcFilters extends OrcFiltersBase { // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) // As per the logical in And predicate, we can push down (a1 OR b1). case Or(left, right) => - val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) - val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) - if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) { - None - } else { - Some(Or(leftResultOptional.get, rightResultOptional.get)) - } + for { + lhs <- convertibleFiltersHelper(left, canPartialPushDown) + rhs <- convertibleFiltersHelper(right, canPartialPushDown) + } yield Or(lhs, rhs) case Not(pred) => - val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) - resultOptional.map(Not) + val childResultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) + childResultOptional.map(Not) case other => - if (buildSearchArgument(dataTypeMap, other, newBuilder()).isDefined) { - Some(other) - } else { - None - } + for (_ <- buildSearchArgument(dataTypeMap, other, newBuilder())) yield other } filters.flatMap { filter => convertibleFiltersHelper(filter, true) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index 33ebb5d62e8aa..cd18353a9a623 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -126,22 +126,15 @@ private[orc] object OrcFilters extends Logging { // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) // As per the logical in And predicate, we can push down (a1 OR b1). case Or(left, right) => - val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) - val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) - if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) { - None - } else { - Some(Or(leftResultOptional.get, rightResultOptional.get)) - } + for { + lhs <- convertibleFiltersHelper(left, canPartialPushDown) + rhs <- convertibleFiltersHelper(right, canPartialPushDown) + } yield Or(lhs, rhs) case Not(pred) => - val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) - resultOptional.map(Not) + val childResultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) + childResultOptional.map(Not) case other => - if (buildSearchArgument(dataTypeMap, other, newBuilder()).isDefined) { - Some(other) - } else { - None - } + for (_ <- buildSearchArgument(dataTypeMap, other, newBuilder())) yield other } filters.flatMap { filter => convertibleFiltersHelper(filter, true) From 77eaff365d6afd7826f474a2ad4f10d4b5e7ab86 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 20 Jun 2019 13:59:07 +0800 Subject: [PATCH 4/6] address comments --- .../datasources/orc/OrcFilters.scala | 31 ++++++++++---- .../datasources/orc/OrcFilters.scala | 31 ++++++++++---- .../spark/sql/hive/orc/OrcFilters.scala | 42 ++++++++++++++----- 3 files changed, 80 insertions(+), 24 deletions(-) diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index f8eae6f640ad7..6b3895bc3a5c8 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -124,7 +124,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { val childResultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) childResultOptional.map(Not) case other => - for (_ <- buildSearchArgument(dataTypeMap, other, newBuilder())) yield other + for (_ <- buildLeafSearchArgument(dataTypeMap, other, newBuilder())) yield other } filters.flatMap { filter => convertibleFiltersHelper(filter, true) @@ -173,9 +173,6 @@ private[sql] object OrcFilters extends OrcFiltersBase { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { - def getType(attribute: String): PredicateLeaf.Type = - getPredicateLeafType(dataTypeMap(attribute)) - import org.apache.spark.sql.sources._ expression match { @@ -196,10 +193,30 @@ private[sql] object OrcFilters extends OrcFiltersBase { negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) } yield negate.end() - // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` - // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be - // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). + case other => buildLeafSearchArgument(dataTypeMap, other, builder) + } + } + + /** + * Build a SearchArgument for a leaf predicate and return the builder so far. + * + * @param dataTypeMap a map from the attribute name to its data type. + * @param expression the input filter predicates. + * @param builder the input SearchArgument.Builder. + * @return the builder so far. + */ + private def buildLeafSearchArgument( + dataTypeMap: Map[String, DataType], + expression: Filter, + builder: Builder): Option[Builder] = { + def getType(attribute: String): PredicateLeaf.Type = + getPredicateLeafType(dataTypeMap(attribute)) + import org.apache.spark.sql.sources._ + // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` + // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be + // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). + expression match { case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => val quotedName = quoteAttributeNameIfNeeded(attribute) val castedValue = castLiteralValue(value, dataTypeMap(attribute)) diff --git a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 3adc2367fd452..867d234fadb98 100644 --- a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -124,7 +124,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { val childResultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) childResultOptional.map(Not) case other => - for (_ <- buildSearchArgument(dataTypeMap, other, newBuilder())) yield other + for (_ <- buildLeafSearchArgument(dataTypeMap, other, newBuilder())) yield other } filters.flatMap { filter => convertibleFiltersHelper(filter, true) @@ -173,9 +173,6 @@ private[sql] object OrcFilters extends OrcFiltersBase { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { - def getType(attribute: String): PredicateLeaf.Type = - getPredicateLeafType(dataTypeMap(attribute)) - import org.apache.spark.sql.sources._ expression match { @@ -196,10 +193,30 @@ private[sql] object OrcFilters extends OrcFiltersBase { negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) } yield negate.end() - // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` - // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be - // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). + case other => buildLeafSearchArgument(dataTypeMap, other, builder) + } + } + + /** + * Build a SearchArgument for a leaf predicate and return the builder so far. + * + * @param dataTypeMap a map from the attribute name to its data type. + * @param expression the input filter predicates. + * @param builder the input SearchArgument.Builder. + * @return the builder so far. + */ + private def buildLeafSearchArgument( + dataTypeMap: Map[String, DataType], + expression: Filter, + builder: Builder): Option[Builder] = { + def getType(attribute: String): PredicateLeaf.Type = + getPredicateLeafType(dataTypeMap(attribute)) + import org.apache.spark.sql.sources._ + // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` + // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be + // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). + expression match { case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => val quotedName = quoteAttributeNameIfNeeded(attribute) val castedValue = castLiteralValue(value, dataTypeMap(attribute)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index cd18353a9a623..81e7caaa043bf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -134,7 +134,7 @@ private[orc] object OrcFilters extends Logging { val childResultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) childResultOptional.map(Not) case other => - for (_ <- buildSearchArgument(dataTypeMap, other, newBuilder())) yield other + for (_ <- buildLeafSearchArgument(dataTypeMap, other, newBuilder())) yield other } filters.flatMap { filter => convertibleFiltersHelper(filter, true) @@ -142,6 +142,8 @@ private[orc] object OrcFilters extends Logging { } /** + * Build a SearchArgument and return the builder so far. + * * @param dataTypeMap a map from the attribute name to its data type. * @param expression the input filter predicates. * @param builder the input SearchArgument.Builder. @@ -151,15 +153,6 @@ private[orc] object OrcFilters extends Logging { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { - def isSearchableType(dataType: DataType): Boolean = dataType match { - // Only the values in the Spark types below can be recognized by - // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method. - case ByteType | ShortType | FloatType | DoubleType => true - case IntegerType | LongType | StringType | BooleanType => true - case TimestampType | _: DecimalType => true - case _ => false - } - expression match { case And(left, right) => for { @@ -178,6 +171,35 @@ private[orc] object OrcFilters extends Logging { negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) } yield negate.end() + case other => buildLeafSearchArgument(dataTypeMap, other, builder) + } + } + + /** + * Build a SearchArgument for a leaf predicate and return the builder so far. + * + * @param dataTypeMap a map from the attribute name to its data type. + * @param expression the input filter predicates. + * @param builder the input SearchArgument.Builder. + * @return the builder so far. + */ + private def buildLeafSearchArgument( + dataTypeMap: Map[String, DataType], + expression: Filter, + builder: Builder): Option[Builder] = { + def isSearchableType(dataType: DataType): Boolean = dataType match { + // Only the values in the Spark types below can be recognized by + // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method. + case ByteType | ShortType | FloatType | DoubleType => true + case IntegerType | LongType | StringType | BooleanType => true + case TimestampType | _: DecimalType => true + case _ => false + } + import org.apache.spark.sql.sources._ + // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` + // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be + // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). + expression match { // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). From f2051ec0320a7df74ea8479ae885145cdd8fb030 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 20 Jun 2019 14:16:16 +0800 Subject: [PATCH 5/6] add blank lines --- .../apache/spark/sql/execution/datasources/orc/OrcFilters.scala | 1 + .../apache/spark/sql/execution/datasources/orc/OrcFilters.scala | 1 + .../main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala | 2 ++ 3 files changed, 4 insertions(+) diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 6b3895bc3a5c8..26098ba20bd6f 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -213,6 +213,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { getPredicateLeafType(dataTypeMap(attribute)) import org.apache.spark.sql.sources._ + // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). diff --git a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 867d234fadb98..05218d1fba629 100644 --- a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -213,6 +213,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { getPredicateLeafType(dataTypeMap(attribute)) import org.apache.spark.sql.sources._ + // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index 81e7caaa043bf..fed42cacc92c4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -195,7 +195,9 @@ private[orc] object OrcFilters extends Logging { case TimestampType | _: DecimalType => true case _ => false } + import org.apache.spark.sql.sources._ + // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). From 1792bb6dee912408ce3943e2f942e6cde34714ed Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Sun, 23 Jun 2019 14:29:25 +0800 Subject: [PATCH 6/6] refactor buildSearchArgument --- .../datasources/orc/OrcFilters.scala | 39 ++++++++++--------- .../datasources/orc/OrcFilters.scala | 39 ++++++++++--------- .../spark/sql/hive/orc/OrcFilters.scala | 39 ++++++++++--------- 3 files changed, 60 insertions(+), 57 deletions(-) diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 26098ba20bd6f..995c5ed317de1 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -23,6 +23,7 @@ import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder import org.apache.orc.storage.serde2.io.HiveDecimalWritable +import org.apache.spark.SparkException import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ @@ -64,14 +65,14 @@ private[sql] object OrcFilters extends OrcFiltersBase { */ def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap - for { - // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters)) + // Combines all convertible filters using `And` to produce a single conjunction + val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, filters)) + conjunctionOptional.map { conjunction => // Then tries to build a single ORC `SearchArgument` for the conjunction predicate. // The input predicate is fully convertible. There should not be any empty result in the // following recursive method call `buildSearchArgument`. - builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) - } yield builder.build() + buildSearchArgument(dataTypeMap, conjunction, newBuilder).build() + } } def convertibleFilters( @@ -165,35 +166,35 @@ private[sql] object OrcFilters extends OrcFiltersBase { * Build a SearchArgument and return the builder so far. * * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. + * @param expression the input predicates, which should be fully convertible to SearchArgument. * @param builder the input SearchArgument.Builder. * @return the builder so far. */ private def buildSearchArgument( dataTypeMap: Map[String, DataType], expression: Filter, - builder: Builder): Option[Builder] = { + builder: Builder): Builder = { import org.apache.spark.sql.sources._ expression match { case And(left, right) => - for { - lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd()) - rhs <- buildSearchArgument(dataTypeMap, right, lhs) - } yield rhs.end() + val lhs = buildSearchArgument(dataTypeMap, left, builder.startAnd()) + val rhs = buildSearchArgument(dataTypeMap, right, lhs) + rhs.end() case Or(left, right) => - for { - lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr()) - rhs <- buildSearchArgument(dataTypeMap, right, lhs) - } yield rhs.end() + val lhs = buildSearchArgument(dataTypeMap, left, builder.startOr()) + val rhs = buildSearchArgument(dataTypeMap, right, lhs) + rhs.end() case Not(child) => - for { - negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) - } yield negate.end() + buildSearchArgument(dataTypeMap, child, builder.startNot()).end() - case other => buildLeafSearchArgument(dataTypeMap, other, builder) + case other => + buildLeafSearchArgument(dataTypeMap, other, builder).getOrElse { + throw new SparkException( + "The input filter of OrcFilters.buildSearchArgument should be fully convertible.") + } } } diff --git a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 05218d1fba629..948ab44a8c19c 100644 --- a/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable +import org.apache.spark.SparkException import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ @@ -64,14 +65,14 @@ private[sql] object OrcFilters extends OrcFiltersBase { */ def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap - for { - // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters)) + // Combines all convertible filters using `And` to produce a single conjunction + val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, filters)) + conjunctionOptional.map { conjunction => // Then tries to build a single ORC `SearchArgument` for the conjunction predicate. // The input predicate is fully convertible. There should not be any empty result in the // following recursive method call `buildSearchArgument`. - builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) - } yield builder.build() + buildSearchArgument(dataTypeMap, conjunction, newBuilder).build() + } } def convertibleFilters( @@ -165,35 +166,35 @@ private[sql] object OrcFilters extends OrcFiltersBase { * Build a SearchArgument and return the builder so far. * * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. + * @param expression the input predicates, which should be fully convertible to SearchArgument. * @param builder the input SearchArgument.Builder. * @return the builder so far. */ private def buildSearchArgument( dataTypeMap: Map[String, DataType], expression: Filter, - builder: Builder): Option[Builder] = { + builder: Builder): Builder = { import org.apache.spark.sql.sources._ expression match { case And(left, right) => - for { - lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd()) - rhs <- buildSearchArgument(dataTypeMap, right, lhs) - } yield rhs.end() + val lhs = buildSearchArgument(dataTypeMap, left, builder.startAnd()) + val rhs = buildSearchArgument(dataTypeMap, right, lhs) + rhs.end() case Or(left, right) => - for { - lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr()) - rhs <- buildSearchArgument(dataTypeMap, right, lhs) - } yield rhs.end() + val lhs = buildSearchArgument(dataTypeMap, left, builder.startOr()) + val rhs = buildSearchArgument(dataTypeMap, right, lhs) + rhs.end() case Not(child) => - for { - negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) - } yield negate.end() + buildSearchArgument(dataTypeMap, child, builder.startNot()).end() - case other => buildLeafSearchArgument(dataTypeMap, other, builder) + case other => + buildLeafSearchArgument(dataTypeMap, other, builder).getOrElse { + throw new SparkException( + "The input filter of OrcFilters.buildSearchArgument should be fully convertible.") + } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index fed42cacc92c4..cd1bffb6b7ab7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder +import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.orc.{OrcFilters => DatasourceOrcFilters} import org.apache.spark.sql.execution.datasources.orc.OrcFilters.buildTree @@ -73,14 +74,14 @@ private[orc] object OrcFilters extends Logging { DatasourceOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]] } else { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap - for { - // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters)) + // Combines all convertible filters using `And` to produce a single conjunction + val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, filters)) + conjunctionOptional.map { conjunction => // Then tries to build a single ORC `SearchArgument` for the conjunction predicate. // The input predicate is fully convertible. There should not be any empty result in the // following recursive method call `buildSearchArgument`. - builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) - } yield builder.build() + buildSearchArgument(dataTypeMap, conjunction, newBuilder).build() + } } } @@ -145,33 +146,33 @@ private[orc] object OrcFilters extends Logging { * Build a SearchArgument and return the builder so far. * * @param dataTypeMap a map from the attribute name to its data type. - * @param expression the input filter predicates. + * @param expression the input predicates, which should be fully convertible to SearchArgument. * @param builder the input SearchArgument.Builder. * @return the builder so far. */ private def buildSearchArgument( dataTypeMap: Map[String, DataType], expression: Filter, - builder: Builder): Option[Builder] = { + builder: Builder): Builder = { expression match { case And(left, right) => - for { - lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd()) - rhs <- buildSearchArgument(dataTypeMap, right, lhs) - } yield rhs.end() + val lhs = buildSearchArgument(dataTypeMap, left, builder.startAnd()) + val rhs = buildSearchArgument(dataTypeMap, right, lhs) + rhs.end() case Or(left, right) => - for { - lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr()) - rhs <- buildSearchArgument(dataTypeMap, right, lhs) - } yield rhs.end() + val lhs = buildSearchArgument(dataTypeMap, left, builder.startOr()) + val rhs = buildSearchArgument(dataTypeMap, right, lhs) + rhs.end() case Not(child) => - for { - negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) - } yield negate.end() + buildSearchArgument(dataTypeMap, child, builder.startNot()).end() - case other => buildLeafSearchArgument(dataTypeMap, other, builder) + case other => + buildLeafSearchArgument(dataTypeMap, other, builder).getOrElse { + throw new SparkException( + "The input filter of OrcFilters.buildSearchArgument should be fully convertible.") + } } }