diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index fe5b15cb511d..58515bee0bd1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2322,7 +2322,7 @@ class Dataset[T] private[sql]( } val attrs = this.logicalPlan.output val colsAfterDrop = attrs.filter { attr => - attr != expression + !attr.semanticEquals(expression) }.map(attr => Column(attr)) select(colsAfterDrop : _*) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index d15c1f47b3d2..98936702a013 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -572,6 +572,29 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df.schema.map(_.name) === Seq("value")) } + test("SPARK-28189 drop column using drop with column reference with case-insensitive names") { + // With SQL config caseSensitive OFF, case insensitive column name should work + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val col1 = testData("KEY") + val df1 = testData.drop(col1) + checkAnswer(df1, testData.selectExpr("value")) + assert(df1.schema.map(_.name) === Seq("value")) + + val col2 = testData("Key") + val df2 = testData.drop(col2) + checkAnswer(df2, testData.selectExpr("value")) + assert(df2.schema.map(_.name) === Seq("value")) + } + + // With SQL config caseSensitive ON, AnalysisException should be thrown + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val e = intercept[AnalysisException] { + testData("KEY") + }.getMessage + assert(e.contains("Cannot resolve column name")) + } + } + test("drop unknown column (no-op) with column reference") { val col = Column("random") val df = testData.drop(col)