Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,40 +102,53 @@ object SparkHiveExample {
// | 5| val_5| 5| val_5|
// ...

// Create Hive managed table with Parquet
sql("CREATE TABLE records(key int, value string) STORED AS PARQUET")
// Save DataFrame to Hive managed table as Parquet format
val hiveTableDF = sql("SELECT * FROM records")
hiveTableDF.write.mode(SaveMode.Overwrite).saveAsTable("database_name.records")
// Create External Hive table with Parquet
sql("CREATE EXTERNAL TABLE records(key int, value string) " +
"STORED AS PARQUET LOCATION '/user/hive/warehouse/'")
// to make Hive Parquet format compatible with Spark Parquet format
spark.sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")

// Multiple Parquet files could be created accordingly to volume of data under directory given.
val hiveExternalTableLocation = "/user/hive/warehouse/database_name.db/records"

// Save DataFrame to Hive External table as compatible Parquet format
hiveTableDF.write.mode(SaveMode.Overwrite).parquet(hiveExternalTableLocation)

// Turn on flag for Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

// You can create partitions in Hive table, so downstream queries run much faster.
hiveTableDF.write.mode(SaveMode.Overwrite).partitionBy("key")
.parquet(hiveExternalTableLocation)
// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Reduce number of files for each partition by repartition
hiveTableDF.repartition($"key").write.mode(SaveMode.Overwrite)
.partitionBy("key").parquet(hiveExternalTableLocation)
// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_ints").show()
// +---+
// |key|
// +---+
// | 0|
// | 1|
// | 2|
// ...

// Control the number of files in each partition by coalesce
hiveTableDF.coalesce(10).write.mode(SaveMode.Overwrite)
.partitionBy("key").parquet(hiveExternalTableLocation)
// $example off:spark_hive$
// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// | value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...

spark.stop()
// $example off:spark_hive$
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ object SQLConf {
.createWithDefault(true)

val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
.doc("Whether to follow Parquet's format specification when converting Parquet schema to " +
"Spark SQL schema and vice versa.")
.doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " +
"versions, when converting Parquet schema to Spark SQL schema and vice versa.")
.booleanConf
.createWithDefault(false)

Expand Down