Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1878,17 +1878,25 @@ class Dataset[T] private[sql](
def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
val resolver = sparkSession.sessionState.analyzer.resolver
val allColumns = queryExecution.analyzed.output
val groupCols = colNames.map { colName =>
allColumns.find(col => resolver(col.name, colName)).getOrElse(
val groupCols = colNames.flatMap { colName =>
// It is possibly there are more than one columns with the same name,
// so we call filter instead of find.
val cols = allColumns.filter(col => resolver(col.name, colName))
if (cols.isEmpty) {
throw new AnalysisException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dataset.drop is a no-op if the given name doesn't match any column. Should we follow it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought is:

When an user mistakenly gives wrong column to Dataset.drop, it can be easily found out.

But for Dataset.dropDuplicates, it might be harder to figure out duplicate rows are still there. So to throw an explicit exception looks more proper to me.

s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})"""))
s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
}
cols
}
val groupColExprIds = groupCols.map(_.exprId)
val aggCols = logicalPlan.output.map { attr =>
if (groupColExprIds.contains(attr.exprId)) {
attr
} else {
Alias(new First(attr).toAggregateExpression(), attr.name)()
// Removing duplicate rows should not change output attributes. We should keep
// the original exprId of the attribute. Otherwise, to select a column in original
// dataset will cause analysis exception due to unresolved attribute.
Alias(new First(attr).toAggregateExpression(), attr.name)(exprId = attr.exprId)
}
}
Aggregate(groupCols, aggCols, logicalPlan)
Expand Down
17 changes: 17 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,23 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
("a", 1), ("a", 2), ("b", 1))
}

test("dropDuplicates: columns with same column name") {
val ds1 = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
val ds2 = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
// The dataset joined has two columns of the same name "_2".
val joined = ds1.join(ds2, "_1").select(ds1("_2").as[Int], ds2("_2").as[Int])
checkDataset(
joined.dropDuplicates(),
(1, 2), (1, 1), (2, 1), (2, 2))
}

test("dropDuplicates should not change child plan output") {
val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
checkDataset(
ds.dropDuplicates("_1").select(ds("_1").as[String], ds("_2").as[Int]),
("a", 1), ("b", 1))
}

test("SPARK-16097: Encoders.tuple should handle null object correctly") {
val enc = Encoders.tuple(Encoders.tuple(Encoders.STRING, Encoders.STRING), Encoders.STRING)
val data = Seq((("a", "b"), "c"), (null, "d"))
Expand Down