From 10a80b272e898043e250c2b24a792c9474cf0d10 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 26 Dec 2017 12:30:10 +0800 Subject: [PATCH] clean up --- .../examples/sql/hive/SparkHiveExample.scala | 75 +++++++++++-------- .../apache/spark/sql/internal/SQLConf.scala | 4 +- 2 files changed, 46 insertions(+), 33 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala index b193bd595127..70fb5b27366e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala @@ -102,40 +102,53 @@ object SparkHiveExample { // | 5| val_5| 5| val_5| // ... - // Create Hive managed table with Parquet - sql("CREATE TABLE records(key int, value string) STORED AS PARQUET") - // Save DataFrame to Hive managed table as Parquet format - val hiveTableDF = sql("SELECT * FROM records") - hiveTableDF.write.mode(SaveMode.Overwrite).saveAsTable("database_name.records") - // Create External Hive table with Parquet - sql("CREATE EXTERNAL TABLE records(key int, value string) " + - "STORED AS PARQUET LOCATION '/user/hive/warehouse/'") - // to make Hive Parquet format compatible with Spark Parquet format - spark.sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true") - - // Multiple Parquet files could be created accordingly to volume of data under directory given. - val hiveExternalTableLocation = "/user/hive/warehouse/database_name.db/records" - - // Save DataFrame to Hive External table as compatible Parquet format - hiveTableDF.write.mode(SaveMode.Overwrite).parquet(hiveExternalTableLocation) - - // Turn on flag for Dynamic Partitioning - spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") - spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") - - // You can create partitions in Hive table, so downstream queries run much faster. - hiveTableDF.write.mode(SaveMode.Overwrite).partitionBy("key") - .parquet(hiveExternalTableLocation) + // Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax + // `USING hive` + sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET") + // Save DataFrame to the Hive managed table + val df = spark.table("src") + df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records") + // After insertion, the Hive managed table has data now + sql("SELECT * FROM hive_records").show() + // +---+-------+ + // |key| value| + // +---+-------+ + // |238|val_238| + // | 86| val_86| + // |311|val_311| + // ... - // Reduce number of files for each partition by repartition - hiveTableDF.repartition($"key").write.mode(SaveMode.Overwrite) - .partitionBy("key").parquet(hiveExternalTableLocation) + // Prepare a Parquet data directory + val dataDir = "/tmp/parquet_data" + spark.range(10).write.parquet(dataDir) + // Create a Hive external Parquet table + sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'") + // The Hive external table should already have data + sql("SELECT * FROM hive_ints").show() + // +---+ + // |key| + // +---+ + // | 0| + // | 1| + // | 2| + // ... - // Control the number of files in each partition by coalesce - hiveTableDF.coalesce(10).write.mode(SaveMode.Overwrite) - .partitionBy("key").parquet(hiveExternalTableLocation) - // $example off:spark_hive$ + // Turn on flag for Hive Dynamic Partitioning + spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") + spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") + // Create a Hive partitioned table using DataFrame API + df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl") + // Partitioned column `key` will be moved to the end of the schema. + sql("SELECT * FROM hive_part_tbl").show() + // +-------+---+ + // | value|key| + // +-------+---+ + // |val_238|238| + // | val_86| 86| + // |val_311|311| + // ... spark.stop() + // $example off:spark_hive$ } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 84fe4bb711a4..f16972e5427e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -336,8 +336,8 @@ object SQLConf { .createWithDefault(true) val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat") - .doc("Whether to follow Parquet's format specification when converting Parquet schema to " + - "Spark SQL schema and vice versa.") + .doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " + + "versions, when converting Parquet schema to Spark SQL schema and vice versa.") .booleanConf .createWithDefault(false)