Skip to content

Commit

Permalink
fix more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
parthchandra committed Feb 10, 2025
1 parent 5dfdc19 commit 9389be8
Showing 1 changed file with 53 additions and 40 deletions.
93 changes: 53 additions & 40 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
withParquetTable(path.toString, "tbl") {
checkSparkAnswerAndOperator(f"SELECT _20 - $intColumn FROM tbl")
withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {
checkSparkAnswerAndOperator(f"SELECT _20 - $intColumn FROM tbl")
}
}
}
}
Expand Down Expand Up @@ -2026,7 +2028,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
SQLConf.ANSI_ENABLED.key -> enabled.toString,
CometConf.COMET_ANSI_MODE_ENABLED.key -> enabled.toString,
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true")(f)
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true")(f)
}

def checkOverflow(query: String, dtype: String): Unit = {
Expand Down Expand Up @@ -2146,10 +2149,12 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
withParquetTable(path.toString, "tbl") {
checkSparkAnswerAndOperator("SELECT named_struct('a', _1, 'b', _2) FROM tbl")
checkSparkAnswerAndOperator("SELECT named_struct('a', _1, 'b', 2) FROM tbl")
checkSparkAnswerAndOperator(
"SELECT named_struct('a', named_struct('b', _1, 'c', _2)) FROM tbl")
withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {
checkSparkAnswerAndOperator("SELECT named_struct('a', _1, 'b', _2) FROM tbl")
checkSparkAnswerAndOperator("SELECT named_struct('a', _1, 'b', 2) FROM tbl")
checkSparkAnswerAndOperator(
"SELECT named_struct('a', named_struct('b', _1, 'c', _2)) FROM tbl")
}
}
}
}
Expand All @@ -2161,15 +2166,17 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
withParquetTable(path.toString, "tbl") {
checkSparkAnswerAndOperator(
"SELECT named_struct('a', _1, 'a', _2) FROM tbl",
classOf[ProjectExec])
checkSparkAnswerAndOperator(
"SELECT named_struct('a', _1, 'a', 2) FROM tbl",
classOf[ProjectExec])
checkSparkAnswerAndOperator(
"SELECT named_struct('a', named_struct('b', _1, 'b', _2)) FROM tbl",
classOf[ProjectExec])
withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {
checkSparkAnswerAndOperator(
"SELECT named_struct('a', _1, 'a', _2) FROM tbl",
classOf[ProjectExec])
checkSparkAnswerAndOperator(
"SELECT named_struct('a', _1, 'a', 2) FROM tbl",
classOf[ProjectExec])
checkSparkAnswerAndOperator(
"SELECT named_struct('a', named_struct('b', _1, 'b', _2)) FROM tbl",
classOf[ProjectExec])
}
}
}
}
Expand All @@ -2190,10 +2197,12 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withDictionary = dictionaryEnabled) {

val fields = Range(1, 8).map(n => s"'col$n', _$n").mkString(", ")
withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {

checkSparkAnswerAndOperator(s"SELECT to_json(named_struct($fields)) FROM tbl")
checkSparkAnswerAndOperator(
s"SELECT to_json(named_struct('nested', named_struct($fields))) FROM tbl")
checkSparkAnswerAndOperator(s"SELECT to_json(named_struct($fields)) FROM tbl")
checkSparkAnswerAndOperator(
s"SELECT to_json(named_struct('nested', named_struct($fields))) FROM tbl")
}
}
}
}
Expand Down Expand Up @@ -2541,19 +2550,21 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
val df = spark.read.parquet(path.toString)
checkSparkAnswerAndOperator(df.select(array(col("_2"), col("_3"), col("_4"))))
checkSparkAnswerAndOperator(df.select(array(col("_4"), col("_11"), lit(null))))
checkSparkAnswerAndOperator(
df.select(array(array(col("_4")), array(col("_4"), lit(null)))))
checkSparkAnswerAndOperator(df.select(array(col("_8"), col("_13"))))
// This ends up returning empty strings instead of nulls for the last element
checkSparkAnswerAndOperator(df.select(array(col("_8"), col("_13"), lit(null))))
checkSparkAnswerAndOperator(df.select(array(array(col("_8")), array(col("_13")))))
checkSparkAnswerAndOperator(df.select(array(col("_8"), col("_8"), lit(null))))
checkSparkAnswerAndOperator(df.select(array(struct("_4"), struct("_4"))))
checkSparkAnswerAndOperator(
df.select(array(struct(col("_8").alias("a")), struct(col("_13").alias("a")))))
withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {
val df = spark.read.parquet(path.toString)
checkSparkAnswerAndOperator(df.select(array(col("_2"), col("_3"), col("_4"))))
checkSparkAnswerAndOperator(df.select(array(col("_4"), col("_11"), lit(null))))
checkSparkAnswerAndOperator(
df.select(array(array(col("_4")), array(col("_4"), lit(null)))))
checkSparkAnswerAndOperator(df.select(array(col("_8"), col("_13"))))
// This ends up returning empty strings instead of nulls for the last element
checkSparkAnswerAndOperator(df.select(array(col("_8"), col("_13"), lit(null))))
checkSparkAnswerAndOperator(df.select(array(array(col("_8")), array(col("_13")))))
checkSparkAnswerAndOperator(df.select(array(col("_8"), col("_8"), lit(null))))
checkSparkAnswerAndOperator(df.select(array(struct("_4"), struct("_4"))))
checkSparkAnswerAndOperator(
df.select(array(struct(col("_8").alias("a")), struct(col("_13").alias("a")))))
}
}
}
}
Expand Down Expand Up @@ -2634,17 +2645,19 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
val df = spark.read
.parquet(path.toString)
.select(
array(struct(col("_2"), col("_3"), col("_4"), col("_8")), lit(null)).alias("arr"))
checkSparkAnswerAndOperator(df.select("arr._2", "arr._3", "arr._4"))
withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {
val df = spark.read
.parquet(path.toString)
.select(
array(struct(col("_2"), col("_3"), col("_4"), col("_8")), lit(null)).alias("arr"))
checkSparkAnswerAndOperator(df.select("arr._2", "arr._3", "arr._4"))

val complex = spark.read
.parquet(path.toString)
.select(array(struct(struct(col("_4"), col("_8")).alias("nested"))).alias("arr"))
val complex = spark.read
.parquet(path.toString)
.select(array(struct(struct(col("_4"), col("_8")).alias("nested"))).alias("arr"))

checkSparkAnswerAndOperator(complex.select(col("arr.nested._4")))
checkSparkAnswerAndOperator(complex.select(col("arr.nested._4")))
}
}
}
}
Expand Down

0 comments on commit 9389be8

Please sign in to comment.