diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 5e1ad9b885b1..14284c2c937e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -956,7 +956,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // Storage format val defaultStorage: CatalogStorageFormat = { - val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") + val defaultStorageType = conf.defaultFileFormat.toLowerCase val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf) CatalogStorageFormat( locationUri = None, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2286919f7aad..ad010291dcd9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -295,11 +295,13 @@ object SQLConf { .intConf .createWithDefault(200) - // This is used to set the default data source + // This is used to set the default format for data source tables and Hive tables. val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default") - .doc("The default data source to use in input/output.") + .doc("The default format for data source tables to use in input/output and Hive tables in " + + "CREATE TABLE statement. If not specified, the default format for data source tables is " + + "parquet; the default format for hive tables is textfile") .stringConf - .createWithDefault("parquet") + .createOptional val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS") .internal() @@ -643,7 +645,13 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def broadcastTimeout: Int = getConf(BROADCAST_TIMEOUT) - def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME) + def defaultDataSourceName: String = { + getConf(DEFAULT_DATA_SOURCE_NAME).getOrElse("parquet") + } + + def defaultFileFormat: String = { + getConf(DEFAULT_DATA_SOURCE_NAME).getOrElse("textfile") + } def convertCTAS: Boolean = getConf(CONVERT_CTAS) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index d4d8e3e4e83d..0b69f9eb29dc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -98,8 +98,8 @@ private[sql] trait SQLTestUtils (keys, values).zipped.foreach(spark.conf.set) try f finally { keys.zip(currentValues).foreach { - case (key, Some(value)) => spark.conf.set(key, value) - case (key, None) => spark.conf.unset(key) + case (key, Some(value)) if value != "" => spark.conf.set(key, value) + case (key, _) => spark.conf.unset(key) } } } diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala index 7ba5790c2979..1b4632f031f7 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala @@ -89,15 +89,6 @@ class HiveWindowFunctionQuerySuite extends HiveComparisonTest with BeforeAndAfte s""" |LOAD DATA LOCAL INPATH '$testData2' overwrite into table over1k """.stripMargin) - - // The following settings are used for generating golden files with Hive. - // We have to use kryo to correctly let Hive serialize plans with window functions. - // This is used to generate golden files. - sql("set hive.plan.serialization.format=kryo") - // Explicitly set fs to local fs. - sql(s"set fs.default.name=file://$testTempDir/") - // Ask Hive to run jobs in-process as a single map and reduce task. - sql("set mapred.job.tracker=local") } override def afterAll() { @@ -758,15 +749,6 @@ class HiveWindowFunctionQueryFileSuite TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) // Add Locale setting Locale.setDefault(Locale.US) - - // The following settings are used for generating golden files with Hive. - // We have to use kryo to correctly let Hive serialize plans with window functions. - // This is used to generate golden files. - // sql("set hive.plan.serialization.format=kryo") - // Explicitly set fs to local fs. - // sql(s"set fs.default.name=file://$testTempDir/") - // Ask Hive to run jobs in-process as a single map and reduce task. - // sql("set mapred.job.tracker=local") } override def afterAll() { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index d15e11a7ff20..55056124eed7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -141,6 +141,68 @@ class HiveDDLSuite } } + test("create table - the default spark.sql.default.fileformat is textfile") { + val catalog = spark.sessionState.catalog + val tabName = "tab1" + withTable(tabName) { + sql(s"CREATE TABLE $tabName(c1 int)") + val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName, Some("default"))) + val storage = tableMetadata.storage + assert(storage.inputFormat == Option("org.apache.hadoop.mapred.TextInputFormat")) + assert(storage.outputFormat == + Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) + } + } + + test("create table - change default spark.sql.default.fileformat to parquet") { + val catalog = spark.sessionState.catalog + val tabName = "tab1" + withTable(tabName) { + // the format name should be case incensitive + withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "pArQuEt") { + sql(s"CREATE TABLE $tabName(c1 int)") + val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName, Some("default"))) + val storage = tableMetadata.storage + assert(storage.inputFormat == + Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")) + assert(storage.outputFormat == + Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) + } + } + } + + test("create table - change default spark.sql.default.fileformat to nonexistent format") { + val catalog = spark.sessionState.catalog + val tabName = "tab1" + withTable(tabName) { + // the format name should be case insensitive + withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "nonExistent") { + sql(s"CREATE TABLE $tabName(c1 int)") + val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName, Some("default"))) + val storage = tableMetadata.storage + assert(storage.inputFormat == Option("org.apache.hadoop.mapred.TextInputFormat")) + assert(storage.outputFormat == + Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) + } + } + } + + test("create table - spark.sql.default.fileformat") { + val catalog = spark.sessionState.catalog + val tabName = "tab1" + withTable(tabName) { + withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "parquet") { + sql(s"CREATE TABLE $tabName(c1 int)") + val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName, Some("default"))) + val storage = tableMetadata.storage + assert(storage.inputFormat == + Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")) + assert(storage.outputFormat == + Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) + } + } + } + test("add/drop partitions - external table") { val catalog = spark.sessionState.catalog withTempDir { tmpDir =>