Skip to content

Commit de8a03e

Browse files
hvanhovellcloud-fan
authored andcommitted
[SPARK-19459][SQL] Add Hive datatype (char/varchar) to StructField metadata
## What changes were proposed in this pull request? Reading from an existing ORC table which contains `char` or `varchar` columns can fail with a `ClassCastException` if the table metadata has been created using Spark. This is caused by the fact that spark internally replaces `char` and `varchar` columns with a `string` column. This PR fixes this by adding the hive type to the `StructField's` metadata under the `HIVE_TYPE_STRING` key. This is picked up by the `HiveClient` and the ORC reader, see #16060 for more details on how the metadata is used. ## How was this patch tested? Added a regression test to `OrcSourceSuite`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #16804 from hvanhovell/SPARK-19459.
1 parent dadff5f commit de8a03e

File tree

6 files changed

+76
-25
lines changed

6 files changed

+76
-25
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1457,8 +1457,28 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
14571457
*/
14581458
override def visitColType(ctx: ColTypeContext): StructField = withOrigin(ctx) {
14591459
import ctx._
1460-
val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true)
1461-
if (STRING == null) structField else structField.withComment(string(STRING))
1460+
1461+
val builder = new MetadataBuilder
1462+
// Add comment to metadata
1463+
if (STRING != null) {
1464+
builder.putString("comment", string(STRING))
1465+
}
1466+
// Add Hive type string to metadata.
1467+
dataType match {
1468+
case p: PrimitiveDataTypeContext =>
1469+
p.identifier.getText.toLowerCase match {
1470+
case "varchar" | "char" =>
1471+
builder.putString(HIVE_TYPE_STRING, dataType.getText.toLowerCase)
1472+
case _ =>
1473+
}
1474+
case _ =>
1475+
}
1476+
1477+
StructField(
1478+
identifier.getText,
1479+
typedVisit(dataType),
1480+
nullable = true,
1481+
builder.build())
14621482
}
14631483

14641484
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,12 @@ package org.apache.spark.sql
2121
* Contains a type system for attributes produced by relations, including complex types like
2222
* structs, arrays and maps.
2323
*/
24-
package object types
24+
package object types {
25+
/**
26+
* Metadata key used to store the raw hive type string in the metadata of StructField. This
27+
* is relevant for datatypes that do not have a direct Spark SQL counterpart, such as CHAR and
28+
* VARCHAR. We need to preserve the original type in order to invoke the correct object
29+
* inspector in Hive.
30+
*/
31+
val HIVE_TYPE_STRING = "HIVE_TYPE_STRING"
32+
}

sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,10 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext {
203203
(2 to 10).map(i => Row(i, i - 1)).toSeq)
204204

205205
test("Schema and all fields") {
206+
def hiveMetadata(dt: String): Metadata = {
207+
new MetadataBuilder().putString(HIVE_TYPE_STRING, dt).build()
208+
}
209+
206210
val expectedSchema = StructType(
207211
StructField("string$%Field", StringType, true) ::
208212
StructField("binaryField", BinaryType, true) ::
@@ -217,8 +221,8 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext {
217221
StructField("decimalField2", DecimalType(9, 2), true) ::
218222
StructField("dateField", DateType, true) ::
219223
StructField("timestampField", TimestampType, true) ::
220-
StructField("varcharField", StringType, true) ::
221-
StructField("charField", StringType, true) ::
224+
StructField("varcharField", StringType, true, hiveMetadata("varchar(12)")) ::
225+
StructField("charField", StringType, true, hiveMetadata("char(18)")) ::
222226
StructField("arrayFieldSimple", ArrayType(IntegerType), true) ::
223227
StructField("arrayFieldComplex",
224228
ArrayType(

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,6 @@ private[spark] object HiveUtils extends Logging {
6161
/** The version of hive used internally by Spark SQL. */
6262
val hiveExecutionVersion: String = "1.2.1"
6363

64-
/**
65-
* The property key that is used to store the raw hive type string in the metadata of StructField.
66-
* For example, in the case where the Hive type is varchar, the type gets mapped to a string type
67-
* in Spark SQL, but we need to preserve the original type in order to invoke the correct object
68-
* inspector in Hive.
69-
*/
70-
val hiveTypeString: String = "HIVE_TYPE_STRING"
71-
7264
val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
7365
.doc("Version of the Hive metastore. Available options are " +
7466
s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")
@@ -465,8 +457,8 @@ private[spark] object HiveUtils extends Logging {
465457

466458
/** Converts the native StructField to Hive's FieldSchema. */
467459
private def toHiveColumn(c: StructField): FieldSchema = {
468-
val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
469-
c.metadata.getString(HiveUtils.hiveTypeString)
460+
val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
461+
c.metadata.getString(HIVE_TYPE_STRING)
470462
} else {
471463
c.dataType.catalogString
472464
}
@@ -482,7 +474,7 @@ private[spark] object HiveUtils extends Logging {
482474
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
483475
}
484476

485-
val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
477+
val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
486478
val field = StructField(
487479
name = hc.getName,
488480
dataType = columnType,

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
4747
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
4848
import org.apache.spark.sql.execution.QueryExecutionException
4949
import org.apache.spark.sql.hive.HiveUtils
50-
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
50+
import org.apache.spark.sql.types._
5151
import org.apache.spark.util.{CircularBuffer, Utils}
5252

5353
/**
@@ -790,8 +790,8 @@ private[hive] class HiveClientImpl(
790790
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
791791

792792
private def toHiveColumn(c: StructField): FieldSchema = {
793-
val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
794-
c.metadata.getString(HiveUtils.hiveTypeString)
793+
val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
794+
c.metadata.getString(HIVE_TYPE_STRING)
795795
} else {
796796
c.dataType.catalogString
797797
}
@@ -806,7 +806,7 @@ private[hive] class HiveClientImpl(
806806
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
807807
}
808808

809-
val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
809+
val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
810810
val field = StructField(
811811
name = hc.getName,
812812
dataType = columnType,

sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,14 +152,41 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
152152
assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
153153
}
154154

155-
test("SPARK-18220: read Hive orc table with varchar column") {
155+
test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
156156
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
157+
val location = Utils.createTempDir()
158+
val uri = location.toURI
157159
try {
158-
hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS orc")
159-
hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM (SELECT 1) t")
160-
checkAnswer(spark.table("orc_varchar"), Row("a"))
160+
hiveClient.runSqlHive(
161+
"""
162+
|CREATE EXTERNAL TABLE hive_orc(
163+
| a STRING,
164+
| b CHAR(10),
165+
| c VARCHAR(10))
166+
|STORED AS orc""".stripMargin)
167+
// Hive throws an exception if I assign the location in the create table statement.
168+
hiveClient.runSqlHive(
169+
s"ALTER TABLE hive_orc SET LOCATION '$uri'")
170+
hiveClient.runSqlHive(
171+
"INSERT INTO TABLE hive_orc SELECT 'a', 'b', 'c' FROM (SELECT 1) t")
172+
173+
// We create a different table in Spark using the same schema which points to
174+
// the same location.
175+
spark.sql(
176+
s"""
177+
|CREATE EXTERNAL TABLE spark_orc(
178+
| a STRING,
179+
| b CHAR(10),
180+
| c VARCHAR(10))
181+
|STORED AS orc
182+
|LOCATION '$uri'""".stripMargin)
183+
val result = Row("a", "b ", "c")
184+
checkAnswer(spark.table("hive_orc"), result)
185+
checkAnswer(spark.table("spark_orc"), result)
161186
} finally {
162-
hiveClient.runSqlHive("DROP TABLE IF EXISTS orc_varchar")
187+
hiveClient.runSqlHive("DROP TABLE IF EXISTS hive_orc")
188+
hiveClient.runSqlHive("DROP TABLE IF EXISTS spark_orc")
189+
Utils.deleteRecursively(location)
163190
}
164191
}
165192
}

0 commit comments

Comments
 (0)