Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ trait CheckAnalysis {
}
}

case s @ SetOperation(left, right) if left.output.length != right.output.length =>
failAnalysis(
s"${s.nodeName} can only be performed on tables with the same number of columns, " +
s"but the left table has ${left.output.length} columns and the right has " +
s"${right.output.length}")

case _ => // Fallbacks to the following checks
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ object HiveTypeCoercion {
planName: String,
left: LogicalPlan,
right: LogicalPlan): (LogicalPlan, LogicalPlan) = {
require(left.output.length == right.output.length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Someone has tried to add this restrict before in #6174, but failed as hive support different length.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My patch for this was motivated by the fact that my fuzz tester was throwing runtime errors for queries involving UNION ALL with differing numbers of columns: if you run such a UNION ALL query and then attempt to convert the result to a SchemaRDD you can get ArrayIndexOutOfBounds exceptions in CatalystTypeConverters. I'll see if there's a better way to fix this issue.


val castedTypes = left.output.zip(right.output).map {
case (lhs, rhs) if lhs.dataType != rhs.dataType =>
Expand All @@ -229,15 +230,10 @@ object HiveTypeCoercion {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case p if p.analyzed => p

case u @ Union(left, right) if u.childrenResolved && !u.resolved =>
val (newLeft, newRight) = widenOutputTypes(u.nodeName, left, right)
Union(newLeft, newRight)
case e @ Except(left, right) if e.childrenResolved && !e.resolved =>
val (newLeft, newRight) = widenOutputTypes(e.nodeName, left, right)
Except(newLeft, newRight)
case i @ Intersect(left, right) if i.childrenResolved && !i.resolved =>
val (newLeft, newRight) = widenOutputTypes(i.nodeName, left, right)
Intersect(newLeft, newRight)
case s @ SetOperation(left, right) if s.childrenResolved
&& left.output.length == right.output.length && !s.resolved =>
val (newLeft, newRight) = widenOutputTypes(s.nodeName, left, right)
s.makeCopy(Array(newLeft, newRight))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,32 @@ case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

case class Union(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
// TODO: These aren't really the same attributes as nullability etc might change.
override def output: Seq[Attribute] = left.output
final override def output: Seq[Attribute] = left.output

override lazy val resolved: Boolean =
final override lazy val resolved: Boolean =
childrenResolved &&
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType }
left.output.length == right.output.length &&
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType }
}

private[sql] object SetOperation {
def unapply(p: SetOperation): Option[(LogicalPlan, LogicalPlan)] = Some((p.left, p.right))
}

case class Union(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {

override def statistics: Statistics = {
val sizeInBytes = left.statistics.sizeInBytes + right.statistics.sizeInBytes
Statistics(sizeInBytes = sizeInBytes)
}
}

case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right)

case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right)

case class Join(
left: LogicalPlan,
right: LogicalPlan,
Expand Down Expand Up @@ -142,15 +154,6 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}


case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output

override lazy val resolved: Boolean =
childrenResolved &&
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType }
}

case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
Expand All @@ -160,7 +163,7 @@ case class InsertIntoTable(
extends LogicalPlan {

override def children: Seq[LogicalPlan] = child :: Nil
override def output: Seq[Attribute] = child.output
override def output: Seq[Attribute] = Seq.empty

assert(overwrite || !ifNotExists)
override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall {
Expand Down Expand Up @@ -440,10 +443,3 @@ case object OneRowRelation extends LeafNode {
override def statistics: Statistics = Statistics(sizeInBytes = 1)
}

case class Intersect(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output

override lazy val resolved: Boolean =
childrenResolved &&
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType }
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,24 @@ class AnalysisErrorSuite extends AnalysisTest {
UnresolvedTestPlan(),
"unresolved" :: Nil)

errorTest(
"union with unequal number of columns",
testRelation.unionAll(testRelation2),
"union" :: "number of columns" :: testRelation2.output.length.toString ::
testRelation.output.length.toString :: Nil)

errorTest(
"intersect with unequal number of columns",
testRelation.intersect(testRelation2),
"intersect" :: "number of columns" :: testRelation2.output.length.toString ::
testRelation.output.length.toString :: Nil)

errorTest(
"except with unequal number of columns",
testRelation.except(testRelation2),
"except" :: "number of columns" :: testRelation2.output.length.toString ::
testRelation.output.length.toString :: Nil)

errorTest(
"SPARK-9955: correct error message for aggregate",
// When parse SQL string, we will wrap aggregate expressions with UnresolvedAlias.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ private[hive] case class InsertIntoHiveTable(
extends LogicalPlan {

override def children: Seq[LogicalPlan] = child :: Nil
override def output: Seq[Attribute] = child.output
override def output: Seq[Attribute] = Seq.empty

val numDynamicPartitions = partition.values.count(_.isEmpty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ case class InsertIntoHiveTable(
serializer
}

def output: Seq[Attribute] = child.output
def output: Seq[Attribute] = Seq.empty

def saveAsHiveFile(
rdd: RDD[InternalRow],
Expand Down