Skip to content

Commit 210e9cb

Browse files
Adding disjunctive filter predicates
1 parent a93a588 commit 210e9cb

File tree

4 files changed

+21
-12
lines changed

4 files changed

+21
-12
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ case class BoundReference(ordinal: Int, baseReference: Attribute)
4545

4646
override def toString = s"$baseReference:$ordinal"
4747

48-
override def eval(input: Row): Any = if (input != null) input(ordinal) else null
48+
override def eval(input: Row): Any = input(ordinal)
4949
}
5050

5151
/**

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,18 @@ object ParquetFilters {
114114
// https://github.com/Parquet/parquet-mr/issues/371
115115
// has been resolved
116116
val filters: Seq[UnboundRecordFilter] = filterExpressions.collect {
117+
case Or(left: Expression, right: Expression)
118+
if createFilter(Seq(left)) != null && createFilter(Seq(right)) != null => {
119+
// Note: if either side of this Or-predicate is empty then this means
120+
// it contains a more complex comparison than between attribute and literal
121+
// (e.g., it contained a CAST). The only safe thing to do is then to disregard
122+
// this disjunction, which could be contained in a conjunction. If it stands
123+
// alone then it is also safe to drop it, since a Null return value of this
124+
// function is interpreted as having no filters at all.
125+
val leftFilter = createFilter(Seq(left))
126+
val rightFilter = createFilter(Seq(right))
127+
OrRecordFilter.or(leftFilter, rightFilter)
128+
}
117129
case Equals(left: Literal, right: NamedExpression) if !right.nullable =>
118130
createEqualityFilter(right.name, left)
119131
case Equals(left: NamedExpression, right: Literal) if !left.nullable =>
@@ -135,7 +147,6 @@ object ParquetFilters {
135147
case GreaterThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
136148
createGreaterThanOrEqualFilter(left.name, right)
137149
}
138-
// TODO: How about disjunctions? (Or-ed)
139150
if (filters.length > 0) filters.reduce(AndRecordFilter.and) else null
140151
}
141152

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -146,16 +146,6 @@ private[sql] object ParquetRelation {
146146
new ParquetRelation(path.toString)
147147
}
148148

149-
def checkPredicatePushdownPossible(filters: Seq[Expression]): Boolean = {
150-
def checkFeasible(left: Expression, right: Expression) =
151-
left.isInstanceOf[Literal] || right.isInstanceOf[Literal]
152-
filters.forall {
153-
case Equals(left, right) => checkFeasible(left, right)
154-
case LessThan(left, right) => checkFeasible(left, right)
155-
case _ => false
156-
}
157-
}
158-
159149
private def checkPath(pathStr: String, allowExisting: Boolean, conf: Configuration): Path = {
160150
if (pathStr == null) {
161151
throw new IllegalArgumentException("Unable to create ParquetRelation: path is null")

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,14 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
266266
val result2 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 150 AND $myval <= 200").collect()
267267
assert(result2.size === 50)
268268
}
269+
for(myval <- Seq("myint", "mylong")) {
270+
val result3 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190 OR $myval < 10").collect()
271+
assert(result3.size === 20)
272+
}
273+
for(myval <- Seq("mydouble", "myfloat")) {
274+
val result3 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190.5 OR $myval < 10").collect()
275+
assert(result3.size === 20)
276+
}
269277
val booleanResult = sql(s"SELECT * FROM testfiltersource WHERE myboolean = true AND myint < 40").collect()
270278
assert(booleanResult.size === 10)
271279
val stringResult = sql("SELECT * FROM testfiltersource WHERE mystring = \"100\"").collect()

0 commit comments

Comments
 (0)