Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,39 +138,76 @@ private[sql] object OrcFilters {
dataTypeMap: Map[String, DataType],
expression: Filter,
builder: Builder): Option[Builder] = {
createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true)
}

/**
* @param dataTypeMap a map from the attribute name to its data type.
* @param expression the input filter predicates.
* @param builder the input SearchArgument.Builder.
* @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed
* down safely. Pushing ONLY one side of AND down is safe to
* do at the top level or none of its ancestors is NOT and OR.
* @return the builder so far.
*/
private def createBuilder(
dataTypeMap: Map[String, DataType],
expression: Filter,
builder: Builder,
canPartialPushDownConjuncts: Boolean): Option[Builder] = {
def getType(attribute: String): PredicateLeaf.Type =
getPredicateLeafType(dataTypeMap(attribute))

import org.apache.spark.sql.sources._

expression match {
case And(left, right) =>
// At here, it is not safe to just convert one side if we do not understand the
// other side. Here is an example used to explain the reason.
// At here, it is not safe to just convert one side and remove the other side
// if we do not understand what the parent filters are.
//
// Here is an example used to explain the reason.
// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
// convert b in ('1'). If we only convert a = 2, we will end up with a filter
// NOT(a = 2), which will generate wrong results.
// Pushing one side of AND down is only safe to do at the top level.
// You can see ParquetRelation's initializeLocalJobFunc method as an example.
for {
_ <- buildSearchArgument(dataTypeMap, left, newBuilder)
_ <- buildSearchArgument(dataTypeMap, right, newBuilder)
lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd())
rhs <- buildSearchArgument(dataTypeMap, right, lhs)
} yield rhs.end()
//
// Pushing one side of AND down is only safe to do at the top level or in the child
// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
// can be safely removed.
val leftBuilderOption =
createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts)
val rightBuilderOption =
createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts)
Copy link
Member

@dbtsai dbtsai Oct 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, can you make the format the same as leftBuilderOption? Also, add another empty line before (leftBuilderOption, rightBuilderOption). Thanks.

(leftBuilderOption, rightBuilderOption) match {
case (Some(_), Some(_)) =>
for {
lhs <- createBuilder(dataTypeMap, left,
builder.startAnd(), canPartialPushDownConjuncts)
rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts)
} yield rhs.end()

case (Some(_), None) if canPartialPushDownConjuncts =>
createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts)

case (None, Some(_)) if canPartialPushDownConjuncts =>
createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts)

case _ => None
}

case Or(left, right) =>
for {
_ <- buildSearchArgument(dataTypeMap, left, newBuilder)
_ <- buildSearchArgument(dataTypeMap, right, newBuilder)
lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr())
rhs <- buildSearchArgument(dataTypeMap, right, lhs)
_ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = false)
_ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = false)
lhs <- createBuilder(dataTypeMap, left,
builder.startOr(), canPartialPushDownConjuncts = false)
rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = false)
} yield rhs.end()

case Not(child) =>
for {
_ <- buildSearchArgument(dataTypeMap, child, newBuilder)
negate <- buildSearchArgument(dataTypeMap, child, builder.startNot())
_ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false)
negate <- createBuilder(dataTypeMap,
child, builder.startNot(), canPartialPushDownConjuncts = false)
} yield negate.end()

// NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
}
}

test("SPARK-12218 Converting conjunctions into ORC SearchArguments") {
test("SPARK-12218 and SPARK-25699 Converting conjunctions into ORC SearchArguments") {
import org.apache.spark.sql.sources._
// The `LessThan` should be converted while the `StringContains` shouldn't
val schema = new StructType(
Expand All @@ -382,5 +382,40 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
))
)).get.toString
}

// Can not remove unsupported `StringContains` predicate since it is under `Or` operator.
assert(OrcFilters.createFilter(schema, Array(
Or(
LessThan("a", 10),
And(
StringContains("b", "prefix"),
GreaterThan("a", 1)
)
)
)).isEmpty)

// Safely remove unsupported `StringContains` predicate and push down `LessThan`
assertResult("leaf-0 = (LESS_THAN a 10), expr = leaf-0") {
OrcFilters.createFilter(schema, Array(
And(
LessThan("a", 10),
StringContains("b", "prefix")
)
)).get.toString
}

// Safely remove unsupported `StringContains` predicate, push down `LessThan` and `GreaterThan`.
assertResult("leaf-0 = (LESS_THAN a 10), leaf-1 = (LESS_THAN_EQUALS a 1)," +
" expr = (and leaf-0 (not leaf-1))") {
OrcFilters.createFilter(schema, Array(
And(
And(
LessThan("a", 10),
StringContains("b", "prefix")
),
GreaterThan("a", 1)
)
)).get.toString
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ private[orc] object OrcFilters extends Logging {
dataTypeMap: Map[String, DataType],
expression: Filter,
builder: Builder): Option[Builder] = {
createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true)
}

/**
* @param dataTypeMap a map from the attribute name to its data type.
* @param expression the input filter predicates.
* @param builder the input SearchArgument.Builder.
* @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed
* down safely. Pushing ONLY one side of AND down is safe to
* do at the top level or none of its ancestors is NOT and OR.
* @return the builder so far.
*/
private def createBuilder(
dataTypeMap: Map[String, DataType],
expression: Filter,
builder: Builder,
canPartialPushDownConjuncts: Boolean): Option[Builder] = {
def isSearchableType(dataType: DataType): Boolean = dataType match {
// Only the values in the Spark types below can be recognized by
// the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
Expand All @@ -90,32 +107,52 @@ private[orc] object OrcFilters extends Logging {

expression match {
case And(left, right) =>
// At here, it is not safe to just convert one side if we do not understand the
// other side. Here is an example used to explain the reason.
// At here, it is not safe to just convert one side and remove the other side
// if we do not understand what the parent filters are.
//
// Here is an example used to explain the reason.
// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
// convert b in ('1'). If we only convert a = 2, we will end up with a filter
// NOT(a = 2), which will generate wrong results.
// Pushing one side of AND down is only safe to do at the top level.
// You can see ParquetRelation's initializeLocalJobFunc method as an example.
for {
_ <- buildSearchArgument(dataTypeMap, left, newBuilder)
_ <- buildSearchArgument(dataTypeMap, right, newBuilder)
lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd())
rhs <- buildSearchArgument(dataTypeMap, right, lhs)
} yield rhs.end()
//
// Pushing one side of AND down is only safe to do at the top level or in the child
// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
// can be safely removed.
val leftBuilderOption =
createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts)
val rightBuilderOption =
createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts)
(leftBuilderOption, rightBuilderOption) match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

case (Some(_), Some(_)) =>
for {
lhs <- createBuilder(dataTypeMap, left,
builder.startAnd(), canPartialPushDownConjuncts)
rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts)
} yield rhs.end()

case (Some(_), None) if canPartialPushDownConjuncts =>
createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts)

case (None, Some(_)) if canPartialPushDownConjuncts =>
createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts)

case _ => None
}

case Or(left, right) =>
for {
_ <- buildSearchArgument(dataTypeMap, left, newBuilder)
_ <- buildSearchArgument(dataTypeMap, right, newBuilder)
lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr())
rhs <- buildSearchArgument(dataTypeMap, right, lhs)
_ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = false)
_ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = false)
lhs <- createBuilder(dataTypeMap, left,
builder.startOr(), canPartialPushDownConjuncts = false)
rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = false)
} yield rhs.end()

case Not(child) =>
for {
_ <- buildSearchArgument(dataTypeMap, child, newBuilder)
negate <- buildSearchArgument(dataTypeMap, child, builder.startNot())
_ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false)
negate <- createBuilder(dataTypeMap,
child, builder.startNot(), canPartialPushDownConjuncts = false)
} yield negate.end()

// NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton {
}
}

test("SPARK-12218 Converting conjunctions into ORC SearchArguments") {
test("SPARK-12218 and SPARK-25699 Converting conjunctions into ORC SearchArguments") {
import org.apache.spark.sql.sources._
// The `LessThan` should be converted while the `StringContains` shouldn't
val schema = new StructType(
Expand Down Expand Up @@ -383,5 +383,48 @@ class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton {
))
)).get.toString
}

// Can not remove unsupported `StringContains` predicate since it is under `Or` operator.
assert(OrcFilters.createFilter(schema, Array(
Or(
LessThan("a", 10),
And(
StringContains("b", "prefix"),
GreaterThan("a", 1)
)
)
)).isEmpty)

// Safely remove unsupported `StringContains` predicate and push down `LessThan`
assertResult(
"""leaf-0 = (LESS_THAN a 10)
|expr = leaf-0
""".stripMargin.trim
) {
OrcFilters.createFilter(schema, Array(
And(
LessThan("a", 10),
StringContains("b", "prefix")
)
)).get.toString
}

// Safely remove unsupported `StringContains` predicate, push down `LessThan` and `GreaterThan`.
assertResult(
"""leaf-0 = (LESS_THAN a 10)
|leaf-1 = (LESS_THAN_EQUALS a 1)
|expr = (and leaf-0 (not leaf-1))
""".stripMargin.trim
) {
OrcFilters.createFilter(schema, Array(
And(
And(
LessThan("a", 10),
StringContains("b", "prefix")
),
GreaterThan("a", 1)
)
)).get.toString
}
}
}