Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ private[spark] object SQLConf {

// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default"
// This is used to control the when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters). We will split the JSON string of a schema
// to its length exceeds the threshold.
val SCHEMA_STRING_LENGTH_THRESHOLD = "spark.sql.sources.schemaStringLengthThreshold"

// Whether to perform eager analysis when constructing a dataframe.
// Set to false when debugging requires the ability to look at invalid query plans.
Expand Down Expand Up @@ -177,6 +182,11 @@ private[sql] class SQLConf extends Serializable {
private[spark] def defaultDataSourceName: String =
getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")

// Do not use a value larger than 4000 as the default value of this property.
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
private[spark] def schemaStringLengthThreshold: Int =
getConf(SCHEMA_STRING_LENGTH_THRESHOLD, "4000").toInt

private[spark] def dataFrameEagerAnalysis: Boolean =
getConf(DATAFRAME_EAGER_ANALYSIS, "true").toBoolean

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,23 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val table = synchronized {
client.getTable(in.database, in.name)
}
val schemaString = table.getProperty("spark.sql.sources.schema")
val userSpecifiedSchema =
if (schemaString == null) {
None
} else {
Some(DataType.fromJson(schemaString).asInstanceOf[StructType])
Option(table.getProperty("spark.sql.sources.schema.numParts")).map { numParts =>
val parts = (0 until numParts.toInt).map { index =>
val part = table.getProperty(s"spark.sql.sources.schema.part.${index}")
if (part == null) {
throw new AnalysisException(
s"Could not read schema from the metastore because it is corrupted " +
s"(missing part ${index} of the schema).")
}

part
}
// Stick all parts back to a single schema string in the JSON representation
// and convert it back to a StructType.
DataType.fromJson(parts.mkString).asInstanceOf[StructType]
}

// It does not appear that the ql client for the metastore has a way to enumerate all the
// SerDe properties directly...
val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap
Expand Down Expand Up @@ -119,7 +129,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

tbl.setProperty("spark.sql.sources.provider", provider)
if (userSpecifiedSchema.isDefined) {
tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json)
val threshold = hive.conf.schemaStringLengthThreshold
val schemaJsonString = userSpecifiedSchema.get.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tbl.setProperty("spark.sql.sources.schema.numParts", parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part)
}
}
options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,4 +591,25 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource)
}
}

test("SPARK-6024 wide schema support") {
// We will need 80 splits for this schema if the threshold is 4000.
val schema = StructType((1 to 5000).map(i => StructField(s"c_${i}", StringType, true)))
assert(
schema.json.size > conf.schemaStringLengthThreshold,
"To correctly test the fix of SPARK-6024, the value of " +
s"spark.sql.sources.schemaStringLengthThreshold needs to be less than ${schema.json.size}")
// Manually create a metastore data source table.
catalog.createDataSourceTable(
tableName = "wide_schema",
userSpecifiedSchema = Some(schema),
provider = "json",
options = Map("path" -> "just a dummy path"),
isExternal = false)

invalidateTable("wide_schema")

val actualSchema = table("wide_schema").schema
assert(schema === actualSchema)
}
}