From a422a7f1c1fb0f055fbb8736a364c5a641afc2a9 Mon Sep 17 00:00:00 2001 From: liutang123 Date: Wed, 18 Apr 2018 22:29:15 +0800 Subject: [PATCH 1/7] [SPARK-24012][SQL] Union of map and other compatible column --- .../sql/catalyst/analysis/TypeCoercion.scala | 31 ++++++++++++++----- .../test/resources/sql-tests/inputs/union.sql | 5 +++ .../resources/sql-tests/results/union.sql.out | 27 +++++++++++----- .../org/apache/spark/sql/SQLQuerySuite.scala | 13 ++++++++ 4 files changed, 60 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index ec7e7761dc4c2..3391006f980d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -267,7 +267,11 @@ object TypeCoercion { case s: Union if s.childrenResolved && s.children.forall(_.output.length == s.children.head.output.length) && !s.resolved => val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(s.children) - s.makeCopy(Array(newChildren)) + if(newChildren != s.children) { + s.makeCopy(Array(newChildren)) + } else { + s + } } /** Build new children with the widest types for each attribute among all the children */ @@ -279,7 +283,7 @@ object TypeCoercion { val targetTypes: Seq[DataType] = getWidestTypes(children, attrIndex = 0, mutable.Queue[DataType]()) - if (targetTypes.nonEmpty) { + if (!targetTypes.forall(null == _)) { // Add an extra Project if the targetTypes are different from the original types. children.map(widenTypes(_, targetTypes)) } else { @@ -296,24 +300,35 @@ object TypeCoercion { // Return the result after the widen data types have been found for all the children if (attrIndex >= children.head.output.length) return castedTypes.toSeq + val types = children.map(_.output(attrIndex).dataType) // For the attrIndex-th attribute, find the widest type - findWiderCommonType(children.map(_.output(attrIndex).dataType)) match { + findWiderCommonType(types) match { // If unable to find an appropriate widen type for this column, return an empty Seq - case None => Seq.empty[DataType] + case None => + castedTypes.enqueue(null) // Otherwise, record the result in the queue and find the type for the next column - case Some(widenType) => + case Some(widenType) if types.exists(_ != widenType) => castedTypes.enqueue(widenType) - getWidestTypes(children, attrIndex + 1, castedTypes) + case _ => + castedTypes.enqueue(null) } + getWidestTypes(children, attrIndex + 1, castedTypes) } /** Given a plan, add an extra project on top to widen some columns' data types. */ private def widenTypes(plan: LogicalPlan, targetTypes: Seq[DataType]): LogicalPlan = { + var changed = false val casted = plan.output.zip(targetTypes).map { - case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), e.name)() + case (e, dt) if null != dt && e.dataType != dt => + changed = true + Alias(Cast(e, dt), e.name)() case (e, _) => e } - Project(casted, plan) + if(changed) { + Project(casted, plan) + } else { + plan + } } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/union.sql b/sql/core/src/test/resources/sql-tests/inputs/union.sql index e57d69eaad033..ff2f9a677d2f4 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/union.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/union.sql @@ -35,6 +35,11 @@ FROM (SELECT col AS col SELECT col FROM p3) T1) T2; +-- SPARK-24012 Union of map and other compatible columns. +SELECT map(1, 2), 'str' +UNION ALL +SELECT map(1, 2, 3, NULL), 1; + -- Clean-up DROP VIEW IF EXISTS t1; DROP VIEW IF EXISTS t2; diff --git a/sql/core/src/test/resources/sql-tests/results/union.sql.out b/sql/core/src/test/resources/sql-tests/results/union.sql.out index d123b7fdbe0cf..71f0519625367 100644 --- a/sql/core/src/test/resources/sql-tests/results/union.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/union.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 14 +-- Number of queries: 15 -- !query 0 @@ -105,15 +105,18 @@ struct -- !query 9 -DROP VIEW IF EXISTS t1 +SELECT map(1, 2), 'str' +UNION ALL +SELECT map(1, 2, 3, NULL), 1 -- !query 9 schema -struct<> +struct,str:string> -- !query 9 output - +{1:2,3:null} 1 +{1:2} str -- !query 10 -DROP VIEW IF EXISTS t2 +DROP VIEW IF EXISTS t1 -- !query 10 schema struct<> -- !query 10 output @@ -121,7 +124,7 @@ struct<> -- !query 11 -DROP VIEW IF EXISTS p1 +DROP VIEW IF EXISTS t2 -- !query 11 schema struct<> -- !query 11 output @@ -129,7 +132,7 @@ struct<> -- !query 12 -DROP VIEW IF EXISTS p2 +DROP VIEW IF EXISTS p1 -- !query 12 schema struct<> -- !query 12 output @@ -137,8 +140,16 @@ struct<> -- !query 13 -DROP VIEW IF EXISTS p3 +DROP VIEW IF EXISTS p2 -- !query 13 schema struct<> -- !query 13 output + + +-- !query 14 +DROP VIEW IF EXISTS p3 +-- !query 14 schema +struct<> +-- !query 14 output + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 640affc10ee58..b30fa480cc5ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -896,6 +896,19 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("SPARK-24012 Union of map and other compatible columns") { + checkAnswer( + sql( + """ + |SELECT map(1, 2), 'str' + |UNION ALL + |SELECT map(1, 2, 3, NULL), 1""".stripMargin), + Row.fromSeq(Seq(Map(1 -> 2), "str")):: + Row.fromSeq(Seq(Map(1 -> 2, 3 -> null), "1")):: + Nil + ) + } + test("EXCEPT") { checkAnswer( sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM upperCaseData"), From cb883d9dae43512d8fb4d62900d5cb9daca859d2 Mon Sep 17 00:00:00 2001 From: liutang123 Date: Fri, 20 Apr 2018 19:09:55 +0800 Subject: [PATCH 2/7] [SPARK-24012][SQL] change map and map coerce-able --- .../sql/catalyst/analysis/TypeCoercion.scala | 40 ++++++++----------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 3391006f980d3..41054e2c8ed6a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -171,6 +171,15 @@ object TypeCoercion { .orElse((t1, t2) match { case (ArrayType(et1, containsNull1), ArrayType(et2, containsNull2)) => findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || containsNull2)) + case (MapType(keyType1, valueType1, n1), MapType(keyType2, valueType2, n2)) + if keyType1.sameType(keyType2) && valueType1.sameType(valueType2) => + val keyType = findWiderTypeForTwo(keyType1, keyType2) + val valueType = findWiderTypeForTwo(valueType1, valueType2) + if(keyType.isEmpty || valueType.isEmpty) { + None + } else { + Some(MapType(keyType.get, valueType.get, n1 || n2)) + } case _ => None }) } @@ -267,11 +276,7 @@ object TypeCoercion { case s: Union if s.childrenResolved && s.children.forall(_.output.length == s.children.head.output.length) && !s.resolved => val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(s.children) - if(newChildren != s.children) { - s.makeCopy(Array(newChildren)) - } else { - s - } + s.makeCopy(Array(newChildren)) } /** Build new children with the widest types for each attribute among all the children */ @@ -283,7 +288,7 @@ object TypeCoercion { val targetTypes: Seq[DataType] = getWidestTypes(children, attrIndex = 0, mutable.Queue[DataType]()) - if (!targetTypes.forall(null == _)) { + if (targetTypes.nonEmpty) { // Add an extra Project if the targetTypes are different from the original types. children.map(widenTypes(_, targetTypes)) } else { @@ -300,35 +305,24 @@ object TypeCoercion { // Return the result after the widen data types have been found for all the children if (attrIndex >= children.head.output.length) return castedTypes.toSeq - val types = children.map(_.output(attrIndex).dataType) // For the attrIndex-th attribute, find the widest type - findWiderCommonType(types) match { + findWiderCommonType(children.map(_.output(attrIndex).dataType)) match { // If unable to find an appropriate widen type for this column, return an empty Seq - case None => - castedTypes.enqueue(null) + case None => Seq.empty[DataType] // Otherwise, record the result in the queue and find the type for the next column - case Some(widenType) if types.exists(_ != widenType) => + case Some(widenType) => castedTypes.enqueue(widenType) - case _ => - castedTypes.enqueue(null) + getWidestTypes(children, attrIndex + 1, castedTypes) } - getWidestTypes(children, attrIndex + 1, castedTypes) } /** Given a plan, add an extra project on top to widen some columns' data types. */ private def widenTypes(plan: LogicalPlan, targetTypes: Seq[DataType]): LogicalPlan = { - var changed = false val casted = plan.output.zip(targetTypes).map { - case (e, dt) if null != dt && e.dataType != dt => - changed = true - Alias(Cast(e, dt), e.name)() + case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), e.name)() case (e, _) => e } - if(changed) { - Project(casted, plan) - } else { - plan - } + Project(casted, plan) } } From 19b5c6a84b38b4ce275093f79eee0ff594e50f90 Mon Sep 17 00:00:00 2001 From: liutang123 Date: Mon, 23 Apr 2018 11:19:36 +0800 Subject: [PATCH 3/7] SPARK-24012 add same type checke for map and array in TypeCoercion#findTightestCommonType --- .../sql/catalyst/analysis/TypeCoercion.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 41054e2c8ed6a..d1ecf5fc76747 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -111,6 +111,18 @@ object TypeCoercion { val dataType = findTightestCommonType(f1.dataType, f2.dataType).get StructField(f1.name, dataType, nullable = f1.nullable || f2.nullable) })) + case (a1 @ ArrayType(et1, containsNull1), a2 @ ArrayType(et2, containsNull2)) + if a1.sameType(a2) => + findTightestCommonType(et1, et2).map(ArrayType(_, containsNull1 || containsNull2)) + case (m1 @ MapType(keyType1, valueType1, n1), m2 @ MapType(keyType2, valueType2, n2)) + if m1.sameType(m2) => + val keyType = findTightestCommonType(keyType1, keyType2) + val valueType = findTightestCommonType(valueType1, valueType2) + if(keyType.isEmpty || valueType.isEmpty) { + None + } else { + Some(MapType(keyType.get, valueType.get, n1 || n2)) + } case _ => None } @@ -171,15 +183,6 @@ object TypeCoercion { .orElse((t1, t2) match { case (ArrayType(et1, containsNull1), ArrayType(et2, containsNull2)) => findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || containsNull2)) - case (MapType(keyType1, valueType1, n1), MapType(keyType2, valueType2, n2)) - if keyType1.sameType(keyType2) && valueType1.sameType(valueType2) => - val keyType = findWiderTypeForTwo(keyType1, keyType2) - val valueType = findWiderTypeForTwo(valueType1, valueType2) - if(keyType.isEmpty || valueType.isEmpty) { - None - } else { - Some(MapType(keyType.get, valueType.get, n1 || n2)) - } case _ => None }) } From 084573946245644447de222c89814fe632f9cb1a Mon Sep 17 00:00:00 2001 From: liutang123 Date: Mon, 23 Apr 2018 13:46:41 +0800 Subject: [PATCH 4/7] SPARK-24012 code style fix --- .../sql/catalyst/analysis/TypeCoercion.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index d1ecf5fc76747..1a11fae7a020d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -111,18 +111,14 @@ object TypeCoercion { val dataType = findTightestCommonType(f1.dataType, f2.dataType).get StructField(f1.name, dataType, nullable = f1.nullable || f2.nullable) })) - case (a1 @ ArrayType(et1, containsNull1), a2 @ ArrayType(et2, containsNull2)) + case (a1 @ ArrayType(et1, hasNull1), a2 @ ArrayType(et2, hasNull2)) if a1.sameType(a2) => - findTightestCommonType(et1, et2).map(ArrayType(_, containsNull1 || containsNull2)) - case (m1 @ MapType(keyType1, valueType1, n1), m2 @ MapType(keyType2, valueType2, n2)) + findTightestCommonType(et1, et2).map(ArrayType(_, hasNull1 || hasNull2)) + case (m1 @ MapType(kt1, vt1, hasNull1), m2 @ MapType(kt2, vt2, hasNull2)) if m1.sameType(m2) => - val keyType = findTightestCommonType(keyType1, keyType2) - val valueType = findTightestCommonType(valueType1, valueType2) - if(keyType.isEmpty || valueType.isEmpty) { - None - } else { - Some(MapType(keyType.get, valueType.get, n1 || n2)) - } + val keyType = findTightestCommonType(kt1, kt2) + val valueType = findTightestCommonType(vt1, vt2) + Some(MapType(keyType.get, valueType.get, hasNull1 || hasNull2)) case _ => None } From 670824fa5fc1f8aa72ca4047893104c3786a0295 Mon Sep 17 00:00:00 2001 From: liutang123 Date: Tue, 24 Apr 2018 11:02:40 +0800 Subject: [PATCH 5/7] SPARK-24012 code style fix --- .../apache/spark/sql/catalyst/analysis/TypeCoercion.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 1a11fae7a020d..25523281b950d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -111,11 +111,11 @@ object TypeCoercion { val dataType = findTightestCommonType(f1.dataType, f2.dataType).get StructField(f1.name, dataType, nullable = f1.nullable || f2.nullable) })) - case (a1 @ ArrayType(et1, hasNull1), a2 @ ArrayType(et2, hasNull2)) - if a1.sameType(a2) => + + case (a1 @ ArrayType(et1, hasNull1), a2 @ ArrayType(et2, hasNull2)) if a1.sameType(a2) => findTightestCommonType(et1, et2).map(ArrayType(_, hasNull1 || hasNull2)) - case (m1 @ MapType(kt1, vt1, hasNull1), m2 @ MapType(kt2, vt2, hasNull2)) - if m1.sameType(m2) => + + case (m1 @ MapType(kt1, vt1, hasNull1), m2 @ MapType(kt2, vt2, hasNull2)) if m1.sameType(m2) => val keyType = findTightestCommonType(kt1, kt2) val valueType = findTightestCommonType(vt1, vt2) Some(MapType(keyType.get, valueType.get, hasNull1 || hasNull2)) From 8cb240fbcab257c4151246c36725b6f1ee873d46 Mon Sep 17 00:00:00 2001 From: liutang123 Date: Tue, 24 Apr 2018 11:40:50 +0800 Subject: [PATCH 6/7] SPARK-24012 add UT for array --- .../test/resources/sql-tests/inputs/union.sql | 6 +++++ .../resources/sql-tests/results/union.sql.out | 27 +++++++++++++------ .../org/apache/spark/sql/SQLQuerySuite.scala | 12 ++++++--- 3 files changed, 34 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/union.sql b/sql/core/src/test/resources/sql-tests/inputs/union.sql index ff2f9a677d2f4..6da1b9b49b226 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/union.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/union.sql @@ -40,6 +40,12 @@ SELECT map(1, 2), 'str' UNION ALL SELECT map(1, 2, 3, NULL), 1; +-- SPARK-24012 Union of array and other compatible columns. +SELECT array(1, 2), 'str' +UNION ALL +SELECT array(1, 2, 3, NULL), 1; + + -- Clean-up DROP VIEW IF EXISTS t1; DROP VIEW IF EXISTS t2; diff --git a/sql/core/src/test/resources/sql-tests/results/union.sql.out b/sql/core/src/test/resources/sql-tests/results/union.sql.out index 71f0519625367..b023df825d814 100644 --- a/sql/core/src/test/resources/sql-tests/results/union.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/union.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 15 +-- Number of queries: 16 -- !query 0 @@ -116,15 +116,18 @@ struct,str:string> -- !query 10 -DROP VIEW IF EXISTS t1 +SELECT array(1, 2), 'str' +UNION ALL +SELECT array(1, 2, 3, NULL), 1 -- !query 10 schema -struct<> +struct,str:string> -- !query 10 output - +[1,2,3,null] 1 +[1,2] str -- !query 11 -DROP VIEW IF EXISTS t2 +DROP VIEW IF EXISTS t1 -- !query 11 schema struct<> -- !query 11 output @@ -132,7 +135,7 @@ struct<> -- !query 12 -DROP VIEW IF EXISTS p1 +DROP VIEW IF EXISTS t2 -- !query 12 schema struct<> -- !query 12 output @@ -140,7 +143,7 @@ struct<> -- !query 13 -DROP VIEW IF EXISTS p2 +DROP VIEW IF EXISTS p1 -- !query 13 schema struct<> -- !query 13 output @@ -148,8 +151,16 @@ struct<> -- !query 14 -DROP VIEW IF EXISTS p3 +DROP VIEW IF EXISTS p2 -- !query 14 schema struct<> -- !query 14 output + + +-- !query 15 +DROP VIEW IF EXISTS p3 +-- !query 15 schema +struct<> +-- !query 15 output + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index b30fa480cc5ff..883545c20e6e2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -903,9 +903,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { |SELECT map(1, 2), 'str' |UNION ALL |SELECT map(1, 2, 3, NULL), 1""".stripMargin), - Row.fromSeq(Seq(Map(1 -> 2), "str")):: - Row.fromSeq(Seq(Map(1 -> 2, 3 -> null), "1")):: - Nil + Row(Map(1 -> 2), "str") :: Row(Map(1 -> 2, 3 -> null), "1") :: Nil + ) + checkAnswer( + sql( + """ + |SELECT array(1), 'str' + |UNION ALL + |SELECT array(1, 2, 3, NULL), 1""".stripMargin), + Row(Array(1), "str") :: Row(Array(1, 2, 3, null), "1") :: Nil ) } From 4b1ce3684ead1af50cbb76497d0ae07b70d24f43 Mon Sep 17 00:00:00 2001 From: liutang123 Date: Tue, 24 Apr 2018 16:32:00 +0800 Subject: [PATCH 7/7] SPARK-24012 remove UT from SQLQuerySuite --- .../org/apache/spark/sql/SQLQuerySuite.scala | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 883545c20e6e2..640affc10ee58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -896,25 +896,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } - test("SPARK-24012 Union of map and other compatible columns") { - checkAnswer( - sql( - """ - |SELECT map(1, 2), 'str' - |UNION ALL - |SELECT map(1, 2, 3, NULL), 1""".stripMargin), - Row(Map(1 -> 2), "str") :: Row(Map(1 -> 2, 3 -> null), "1") :: Nil - ) - checkAnswer( - sql( - """ - |SELECT array(1), 'str' - |UNION ALL - |SELECT array(1, 2, 3, NULL), 1""".stripMargin), - Row(Array(1), "str") :: Row(Array(1, 2, 3, null), "1") :: Nil - ) - } - test("EXCEPT") { checkAnswer( sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM upperCaseData"),