Skip to content

Commit 5499651

Browse files
committed
SPARK-25860: Separate end-to-end suite and switch to tables
1 parent 4c35955 commit 5499651

File tree

2 files changed

+73
-43
lines changed

2 files changed

+73
-43
lines changed

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 2 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ import org.scalatest.Matchers._
2929
import org.apache.spark.SparkException
3030
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd}
3131
import org.apache.spark.sql.catalyst.TableIdentifier
32-
import org.apache.spark.sql.catalyst.expressions.{CaseWhen, If, Uuid}
32+
import org.apache.spark.sql.catalyst.expressions.Uuid
3333
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
3434
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union}
35-
import org.apache.spark.sql.execution.{FilterExec, LocalTableScanExec, QueryExecution, WholeStageCodegenExec}
35+
import org.apache.spark.sql.execution.{FilterExec, QueryExecution, WholeStageCodegenExec}
3636
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
3737
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
3838
import org.apache.spark.sql.functions._
@@ -2585,45 +2585,4 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
25852585

25862586
checkAnswer(swappedDf.filter($"key"($"map") > "a"), Row(2, Map(2 -> "b")))
25872587
}
2588-
2589-
test("SPARK-25860: Replace Literal(null, _) with FalseLiteral whenever possible") {
2590-
2591-
def checkPlanIsEmptyLocalScan(df: DataFrame): Unit = df.queryExecution.executedPlan match {
2592-
case s: LocalTableScanExec => assert(s.rows.isEmpty)
2593-
case p => fail(s"$p is not LocalTableScanExec")
2594-
}
2595-
2596-
val df1 = Seq((1, true), (2, false)).toDF("l", "b")
2597-
val df2 = Seq(2, 3).toDF("l")
2598-
2599-
val q1 = df1.where("IF(l > 10, false, b AND null)")
2600-
checkAnswer(q1, Seq.empty)
2601-
checkPlanIsEmptyLocalScan(q1)
2602-
2603-
val q2 = df1.where("CASE WHEN l < 10 THEN null WHEN l > 40 THEN false ELSE null END")
2604-
checkAnswer(q2, Seq.empty)
2605-
checkPlanIsEmptyLocalScan(q2)
2606-
2607-
val q3 = df1.join(df2, when(df1("l") > df2("l"), lit(null)).otherwise(df1("b") && lit(null)))
2608-
checkAnswer(q3, Seq.empty)
2609-
checkPlanIsEmptyLocalScan(q3)
2610-
2611-
val q4 = df1.where("IF(IF(b, null, false), true, null)")
2612-
checkAnswer(q4, Seq.empty)
2613-
checkPlanIsEmptyLocalScan(q4)
2614-
2615-
val q5 = df1.selectExpr("IF(l > 1 AND null, 5, 1) AS out")
2616-
checkAnswer(q5, Row(1) :: Row(1) :: Nil)
2617-
q5.queryExecution.executedPlan.foreach { p =>
2618-
assert(p.expressions.forall(e => e.find(_.isInstanceOf[If]).isEmpty))
2619-
}
2620-
2621-
val q6 = df1.selectExpr("CASE WHEN (l > 2 AND null) THEN 3 ELSE 2 END")
2622-
checkAnswer(q6, Row(2) :: Row(2) :: Nil)
2623-
q6.queryExecution.executedPlan.foreach { p =>
2624-
assert(p.expressions.forall(e => e.find(_.isInstanceOf[CaseWhen]).isEmpty))
2625-
}
2626-
2627-
checkAnswer(df1.where("IF(l > 10, false, b OR null)"), Row(1, true))
2628-
}
26292588
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import org.apache.spark.sql.catalyst.expressions.{CaseWhen, If}
21+
import org.apache.spark.sql.execution.LocalTableScanExec
22+
import org.apache.spark.sql.functions.{lit, when}
23+
import org.apache.spark.sql.test.SharedSQLContext
24+
25+
class ReplaceNullWithFalseEndToEndSuite extends QueryTest with SharedSQLContext {
26+
import testImplicits._
27+
28+
test("SPARK-25860: Replace Literal(null, _) with FalseLiteral whenever possible") {
29+
withTable("t1", "t2") {
30+
Seq((1, true), (2, false)).toDF("l", "b").write.saveAsTable("t1")
31+
Seq(2, 3).toDF("l").write.saveAsTable("t2")
32+
val df1 = spark.table("t1")
33+
val df2 = spark.table("t2")
34+
35+
val q1 = df1.where("IF(l > 10, false, b AND null)")
36+
checkAnswer(q1, Seq.empty)
37+
checkPlanIsEmptyLocalScan(q1)
38+
39+
val q2 = df1.where("CASE WHEN l < 10 THEN null WHEN l > 40 THEN false ELSE null END")
40+
checkAnswer(q2, Seq.empty)
41+
checkPlanIsEmptyLocalScan(q2)
42+
43+
val q3 = df1.join(df2, when(df1("l") > df2("l"), lit(null)).otherwise(df1("b") && lit(null)))
44+
checkAnswer(q3, Seq.empty)
45+
checkPlanIsEmptyLocalScan(q3)
46+
47+
val q4 = df1.where("IF(IF(b, null, false), true, null)")
48+
checkAnswer(q4, Seq.empty)
49+
checkPlanIsEmptyLocalScan(q4)
50+
51+
val q5 = df1.selectExpr("IF(l > 1 AND null, 5, 1) AS out")
52+
checkAnswer(q5, Row(1) :: Row(1) :: Nil)
53+
q5.queryExecution.executedPlan.foreach { p =>
54+
assert(p.expressions.forall(e => e.find(_.isInstanceOf[If]).isEmpty))
55+
}
56+
57+
val q6 = df1.selectExpr("CASE WHEN (l > 2 AND null) THEN 3 ELSE 2 END")
58+
checkAnswer(q6, Row(2) :: Row(2) :: Nil)
59+
q6.queryExecution.executedPlan.foreach { p =>
60+
assert(p.expressions.forall(e => e.find(_.isInstanceOf[CaseWhen]).isEmpty))
61+
}
62+
63+
checkAnswer(df1.where("IF(l > 10, false, b OR null)"), Row(1, true))
64+
}
65+
66+
def checkPlanIsEmptyLocalScan(df: DataFrame): Unit = df.queryExecution.executedPlan match {
67+
case s: LocalTableScanExec => assert(s.rows.isEmpty)
68+
case p => fail(s"$p is not LocalTableScanExec")
69+
}
70+
}
71+
}

0 commit comments

Comments
 (0)