Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ trait CheckAnalysis {
case p: Predicate =>
p.asInstanceOf[Expression].children.foreach(checkValidJoinConditionExprs)
case e if e.dataType.isInstanceOf[BinaryType] =>
failAnalysis(s"expression ${e.prettyString} in join condition " +
s"'${condition.prettyString}' can't be binary type.")
failAnalysis(s"binary type expression ${e.prettyString} cannot be used " +
"in join conditions")
case e if e.dataType.isInstanceOf[MapType] =>
failAnalysis(s"map type expression ${e.prettyString} cannot be used " +
"in join conditions")
case _ => // OK
}

Expand All @@ -114,13 +117,16 @@ trait CheckAnalysis {

def checkValidGroupingExprs(expr: Expression): Unit = expr.dataType match {
case BinaryType =>
failAnalysis(s"grouping expression '${expr.prettyString}' in aggregate can " +
s"not be binary type.")
failAnalysis(s"binary type expression ${expr.prettyString} cannot be used " +
"in grouping expression")
case m: MapType =>
failAnalysis(s"map type expression ${expr.prettyString} cannot be used " +
"in grouping expression")
case _ => // OK
}

aggregateExprs.foreach(checkValidAggregateExpression)
aggregateExprs.foreach(checkValidGroupingExprs)
groupingExprs.foreach(checkValidGroupingExprs)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bug in #7787.


case Sort(orders, _, _) =>
orders.foreach { order =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,71 @@ class AnalysisErrorSuite extends SparkFunSuite with BeforeAndAfter {
val error = intercept[AnalysisException] {
SimpleAnalyzer.checkAnalysis(join)
}
error.message.contains("Failure when resolving conflicting references in Join")
error.message.contains("Conflicting attributes")
assert(error.message.contains("Failure when resolving conflicting references in Join"))
assert(error.message.contains("Conflicting attributes"))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two lines should be put in an assert.

}

test("aggregation can't work on binary and map types") {
val plan =
Aggregate(
AttributeReference("a", BinaryType)(exprId = ExprId(2)) :: Nil,
Alias(Sum(AttributeReference("b", IntegerType)(exprId = ExprId(1))), "c")() :: Nil,
LocalRelation(
AttributeReference("a", BinaryType)(exprId = ExprId(2)),
AttributeReference("b", IntegerType)(exprId = ExprId(1))))

val error = intercept[AnalysisException] {
caseSensitiveAnalyze(plan)
}
assert(error.message.contains("binary type expression a cannot be used in grouping expression"))

val plan2 =
Aggregate(
AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)) :: Nil,
Alias(Sum(AttributeReference("b", IntegerType)(exprId = ExprId(1))), "c")() :: Nil,
LocalRelation(
AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
AttributeReference("b", IntegerType)(exprId = ExprId(1))))

val error2 = intercept[AnalysisException] {
caseSensitiveAnalyze(plan2)
}
assert(error2.message.contains("map type expression a cannot be used in grouping expression"))
}

test("Join can't work on binary and map types") {
val plan =
Join(
LocalRelation(
AttributeReference("a", BinaryType)(exprId = ExprId(2)),
AttributeReference("b", IntegerType)(exprId = ExprId(1))),
LocalRelation(
AttributeReference("c", BinaryType)(exprId = ExprId(4)),
AttributeReference("d", IntegerType)(exprId = ExprId(3))),
Inner,
Some(EqualTo(AttributeReference("a", BinaryType)(exprId = ExprId(2)),
AttributeReference("c", BinaryType)(exprId = ExprId(4)))))

val error = intercept[AnalysisException] {
caseSensitiveAnalyze(plan)
}
assert(error.message.contains("binary type expression a cannot be used in join conditions"))

val plan2 =
Join(
LocalRelation(
AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
AttributeReference("b", IntegerType)(exprId = ExprId(1))),
LocalRelation(
AttributeReference("c", MapType(IntegerType, StringType))(exprId = ExprId(4)),
AttributeReference("d", IntegerType)(exprId = ExprId(3))),
Inner,
Some(EqualTo(AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
AttributeReference("c", MapType(IntegerType, StringType))(exprId = ExprId(4)))))

val error2 = intercept[AnalysisException] {
caseSensitiveAnalyze(plan2)
}
assert(error2.message.contains("map type expression a cannot be used in join conditions"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,4 @@ class DataFrameAggregateSuite extends QueryTest {
emptyTableData.agg(sumDistinct('a)),
Row(null))
}

test("aggregation can't work on binary type") {
val df = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("c").select($"c" cast BinaryType)
intercept[AnalysisException] {
df.groupBy("c").agg(count("*"))
}
intercept[AnalysisException] {
df.distinct
}
}
}
8 changes: 0 additions & 8 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -490,12 +490,4 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
Row(3, 2) :: Nil)

}

test("Join can't work on binary type") {
val left = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("c").select($"c" cast BinaryType)
val right = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("d").select($"d" cast BinaryType)
intercept[AnalysisException] {
left.join(right, ($"left.N" === $"right.N"), "full")
}
}
}