@@ -102,40 +102,53 @@ object SparkHiveExample {
102102 // | 5| val_5| 5| val_5|
103103 // ...
104104
105- // Create Hive managed table with Parquet
106- sql(" CREATE TABLE records(key int, value string) STORED AS PARQUET" )
107- // Save DataFrame to Hive managed table as Parquet format
108- val hiveTableDF = sql(" SELECT * FROM records" )
109- hiveTableDF.write.mode(SaveMode .Overwrite ).saveAsTable(" database_name.records" )
110- // Create External Hive table with Parquet
111- sql(" CREATE EXTERNAL TABLE records(key int, value string) " +
112- " STORED AS PARQUET LOCATION '/user/hive/warehouse/'" )
113- // to make Hive Parquet format compatible with Spark Parquet format
114- spark.sqlContext.setConf(" spark.sql.parquet.writeLegacyFormat" , " true" )
115-
116- // Multiple Parquet files could be created accordingly to volume of data under directory given.
117- val hiveExternalTableLocation = " /user/hive/warehouse/database_name.db/records"
118-
119- // Save DataFrame to Hive External table as compatible Parquet format
120- hiveTableDF.write.mode(SaveMode .Overwrite ).parquet(hiveExternalTableLocation)
121-
122- // Turn on flag for Dynamic Partitioning
123- spark.sqlContext.setConf(" hive.exec.dynamic.partition" , " true" )
124- spark.sqlContext.setConf(" hive.exec.dynamic.partition.mode" , " nonstrict" )
125-
126- // You can create partitions in Hive table, so downstream queries run much faster.
127- hiveTableDF.write.mode(SaveMode .Overwrite ).partitionBy(" key" )
128- .parquet(hiveExternalTableLocation)
105+ // Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
106+ // `USING hive`
107+ sql(" CREATE TABLE hive_records(key int, value string) STORED AS PARQUET" )
108+ // Save DataFrame to the Hive managed table
109+ val df = spark.table(" src" )
110+ df.write.mode(SaveMode .Overwrite ).saveAsTable(" hive_records" )
111+ // After insertion, the Hive managed table has data now
112+ sql(" SELECT * FROM hive_records" ).show()
113+ // +---+-------+
114+ // |key| value|
115+ // +---+-------+
116+ // |238|val_238|
117+ // | 86| val_86|
118+ // |311|val_311|
119+ // ...
129120
130- // Reduce number of files for each partition by repartition
131- hiveTableDF.repartition($" key" ).write.mode(SaveMode .Overwrite )
132- .partitionBy(" key" ).parquet(hiveExternalTableLocation)
121+ // Prepare a Parquet data directory
122+ val dataDir = " /tmp/parquet_data"
123+ spark.range(10 ).write.parquet(dataDir)
124+ // Create a Hive external Parquet table
125+ sql(s " CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION ' $dataDir' " )
126+ // The Hive external table should already have data
127+ sql(" SELECT * FROM hive_ints" ).show()
128+ // +---+
129+ // |key|
130+ // +---+
131+ // | 0|
132+ // | 1|
133+ // | 2|
134+ // ...
133135
134- // Control the number of files in each partition by coalesce
135- hiveTableDF.coalesce(10 ).write.mode(SaveMode .Overwrite )
136- .partitionBy(" key" ).parquet(hiveExternalTableLocation)
137- // $example off:spark_hive$
136+ // Turn on flag for Hive Dynamic Partitioning
137+ spark.sqlContext.setConf(" hive.exec.dynamic.partition" , " true" )
138+ spark.sqlContext.setConf(" hive.exec.dynamic.partition.mode" , " nonstrict" )
139+ // Create a Hive partitioned table using DataFrame API
140+ df.write.partitionBy(" key" ).format(" hive" ).saveAsTable(" hive_part_tbl" )
141+ // Partitioned column `key` will be moved to the end of the schema.
142+ sql(" SELECT * FROM hive_part_tbl" ).show()
143+ // +-------+---+
144+ // | value|key|
145+ // +-------+---+
146+ // |val_238|238|
147+ // | val_86| 86|
148+ // |val_311|311|
149+ // ...
138150
139151 spark.stop()
152+ // $example off:spark_hive$
140153 }
141154}
0 commit comments