Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {

// Storage format
val defaultStorage: CatalogStorageFormat = {
val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
val defaultStorageType = conf.defaultFileFormat.toLowerCase
val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf)
CatalogStorageFormat(
locationUri = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,13 @@ object SQLConf {
.intConf
.createWithDefault(200)

// This is used to set the default data source
// This is used to set the default format for data source tables and Hive tables.
val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default")
.doc("The default data source to use in input/output.")
.doc("The default format for data source tables to use in input/output and Hive tables in " +
"CREATE TABLE statement. If not specified, the default format for data source tables is " +
"parquet; the default format for hive tables is textfile")
.stringConf
.createWithDefault("parquet")
.createOptional

val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
.internal()
Expand Down Expand Up @@ -643,7 +645,13 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def broadcastTimeout: Int = getConf(BROADCAST_TIMEOUT)

def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)
def defaultDataSourceName: String = {
getConf(DEFAULT_DATA_SOURCE_NAME).getOrElse("parquet")
}

def defaultFileFormat: String = {
getConf(DEFAULT_DATA_SOURCE_NAME).getOrElse("textfile")
}

def convertCTAS: Boolean = getConf(CONVERT_CTAS)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ private[sql] trait SQLTestUtils
(keys, values).zipped.foreach(spark.conf.set)
try f finally {
keys.zip(currentValues).foreach {
case (key, Some(value)) => spark.conf.set(key, value)
case (key, None) => spark.conf.unset(key)
case (key, Some(value)) if value != "<undefined>" => spark.conf.set(key, value)
case (key, _) => spark.conf.unset(key)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,6 @@ class HiveWindowFunctionQuerySuite extends HiveComparisonTest with BeforeAndAfte
s"""
|LOAD DATA LOCAL INPATH '$testData2' overwrite into table over1k
""".stripMargin)

// The following settings are used for generating golden files with Hive.
// We have to use kryo to correctly let Hive serialize plans with window functions.
// This is used to generate golden files.
sql("set hive.plan.serialization.format=kryo")
// Explicitly set fs to local fs.
sql(s"set fs.default.name=file://$testTempDir/")
// Ask Hive to run jobs in-process as a single map and reduce task.
sql("set mapred.job.tracker=local")
}

override def afterAll() {
Expand Down Expand Up @@ -758,15 +749,6 @@ class HiveWindowFunctionQueryFileSuite
TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
// Add Locale setting
Locale.setDefault(Locale.US)

// The following settings are used for generating golden files with Hive.
// We have to use kryo to correctly let Hive serialize plans with window functions.
// This is used to generate golden files.
// sql("set hive.plan.serialization.format=kryo")
// Explicitly set fs to local fs.
// sql(s"set fs.default.name=file://$testTempDir/")
// Ask Hive to run jobs in-process as a single map and reduce task.
// sql("set mapred.job.tracker=local")
}

override def afterAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,68 @@ class HiveDDLSuite
}
}

test("create table - the default spark.sql.default.fileformat is textfile") {
val catalog = spark.sessionState.catalog
val tabName = "tab1"
withTable(tabName) {
sql(s"CREATE TABLE $tabName(c1 int)")
val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName, Some("default")))
val storage = tableMetadata.storage
assert(storage.inputFormat == Option("org.apache.hadoop.mapred.TextInputFormat"))
assert(storage.outputFormat ==
Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
}
}

test("create table - change default spark.sql.default.fileformat to parquet") {
val catalog = spark.sessionState.catalog
val tabName = "tab1"
withTable(tabName) {
// the format name should be case incensitive
withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "pArQuEt") {
sql(s"CREATE TABLE $tabName(c1 int)")
val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName, Some("default")))
val storage = tableMetadata.storage
assert(storage.inputFormat ==
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
assert(storage.outputFormat ==
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
}
}
}

test("create table - change default spark.sql.default.fileformat to nonexistent format") {
val catalog = spark.sessionState.catalog
val tabName = "tab1"
withTable(tabName) {
// the format name should be case insensitive
withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "nonExistent") {
sql(s"CREATE TABLE $tabName(c1 int)")
val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName, Some("default")))
val storage = tableMetadata.storage
assert(storage.inputFormat == Option("org.apache.hadoop.mapred.TextInputFormat"))
assert(storage.outputFormat ==
Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
}
}
}

test("create table - spark.sql.default.fileformat") {
val catalog = spark.sessionState.catalog
val tabName = "tab1"
withTable(tabName) {
withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "parquet") {
sql(s"CREATE TABLE $tabName(c1 int)")
val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName, Some("default")))
val storage = tableMetadata.storage
assert(storage.inputFormat ==
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
assert(storage.outputFormat ==
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
}
}
}

test("add/drop partitions - external table") {
val catalog = spark.sessionState.catalog
withTempDir { tmpDir =>
Expand Down