Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ case class InMemoryTableScanExec(
case IsNull(a: Attribute) => statsFor(a).nullCount > 0
case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0

case In(_: AttributeReference, list: Seq[Expression]) if list.isEmpty => Literal.FalseLiteral
Copy link
Member

@gatorsmile gatorsmile Oct 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be moved to optimizer rules. It will cover both cached and non-cached cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this is wrong. If the left value is null, it should return null instead of false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, really. OK, yeah we need to change this. It can be reverted too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is only for filters, does it make any difference null or false?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under this special context of filtering partitions, this In with empty list will result in a false literal in the end, no matter the attribute is null or not. We don't possibly have some expressions like IsNull(In(a, Nil)) as the filter predicate for now.

case In(a: AttributeReference, list: Seq[Expression]) if list.forall(_.isInstanceOf[Literal]) =>
Copy link
Member

@gatorsmile gatorsmile Oct 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change it to

if list.forall(_.isInstanceOf[Literal]) && list.nonEmpty()

or using exists.

list.map(l => statsFor(a).lowerBound <= l.asInstanceOf[Literal] &&
l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,4 +429,19 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
checkAnswer(agg_without_cache, agg_with_cache)
}
}

test("SPARK-22249: IN should work also with cached DataFrame") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case is just an end-to-end test. This test will still pass if the optimizer has a change. We also need a unit test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean this test case is not enough and I should add a test case to check the proper behavior of the optimizer after the change?

val df = spark.range(10).cache()
// with an empty list
assert(df.filter($"id".isin()).count() == 0)
// with a non-empty list
assert(df.filter($"id".isin(2)).count() == 1)
assert(df.filter($"id".isin(2, 3)).count() == 2)
df.unpersist()
val dfNulls = spark.range(10).selectExpr("null as id").cache()
// with null as value for the attribute
assert(dfNulls.filter($"id".isin()).count() == 0)
assert(dfNulls.filter($"id".isin(2, 3)).count() == 0)
dfNulls.unpersist()
}
}