From eaecabc5a59457a4baf84dbb755dd7b876fdb536 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 27 Apr 2018 11:10:55 -0700 Subject: [PATCH 1/2] [SPARK-24112][SQL] Add `convertMetastoreTableProperty` conf --- docs/sql-programming-guide.md | 2 + .../spark/sql/hive/HiveStrategies.scala | 6 ++- .../org/apache/spark/sql/hive/HiveUtils.scala | 7 +++ .../sql/hive/execution/HiveDDLSuite.scala | 54 +++++++++++++++++-- 4 files changed, 64 insertions(+), 5 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 075b953a0898..00a3662c5479 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1812,6 +1812,8 @@ working with timestamps in `pandas_udf`s to get the best performance, see - Since Spark 2.4, creating a managed table with nonempty location is not allowed. An exception is thrown when attempting to create a managed table with nonempty location. To set `true` to `spark.sql.allowCreatingManagedTableUsingNonemptyLocation` restores the previous behavior. This option will be removed in Spark 3.0. - Since Spark 2.4, the type coercion rules can automatically promote the argument types of the variadic SQL functions (e.g., IN/COALESCE) to the widest common type, no matter how the input arguments order. In prior Spark versions, the promotion could fail in some specific orders (e.g., TimestampType, IntegerType and StringType) and throw an exception. - In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone in the input timestamp string, which breaks the assumption that the input timestamp is in a specific timezone. Therefore, these 2 functions can return unexpected results. In version 2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10 00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However, `from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8, will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care about this problem and want to retain the previous behaivor to keep their query unchanged, you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed in Spark 3.0 and should only be used as a temporary workaround. + - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark supports Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. To set `false` to `spark.sql.hive.convertMetastoreTableProperty` restores the previous behavior. + ## Upgrading From Spark SQL 2.2 to 2.3 - Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index a0c197b06dda..54363b05a03c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -189,10 +189,12 @@ case class RelationConversions( // Return true for Apache ORC and Hive ORC-related configuration names. // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. private def isOrcProperty(key: String) = - key.startsWith("orc.") || key.contains(".orc.") + conf.getConf(HiveUtils.CONVERT_METASTORE_TABLE_PROPERTY) && + (key.startsWith("orc.") || key.contains(".orc.")) private def isParquetProperty(key: String) = - key.startsWith("parquet.") || key.contains(".parquet.") + conf.getConf(HiveUtils.CONVERT_METASTORE_TABLE_PROPERTY) && + key.startsWith("parquet.") || key.contains(".parquet.") private def convert(relation: HiveTableRelation): LogicalRelation = { val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 10c960374537..978387cb0226 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -104,6 +104,13 @@ private[spark] object HiveUtils extends Logging { .booleanConf .createWithDefault(false) + val CONVERT_METASTORE_TABLE_PROPERTY = + buildConf("spark.sql.hive.convertMetastoreTableProperty") + .doc("When true, ORC/Parquet table properties are converted together while converting " + + "metastore tables") + .booleanConf + .createWithDefault(true) + val CONVERT_METASTORE_ORC = buildConf("spark.sql.hive.convertMetastoreOrc") .internal() .doc("When set to true, the built-in ORC reader and writer are used to process " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index daac6af9b557..cab581cbbe88 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAl import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} import org.apache.spark.sql.hive.HiveExternalCatalog -import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} +import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET, CONVERT_METASTORE_TABLE_PROPERTY} import org.apache.spark.sql.hive.orc.OrcFileOperator import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -2157,7 +2157,10 @@ class HiveDDLSuite test("SPARK-23355 convertMetastoreOrc should not ignore table properties - STORED AS") { Seq("native", "hive").foreach { orcImpl => - withSQLConf(ORC_IMPLEMENTATION.key -> orcImpl, CONVERT_METASTORE_ORC.key -> "true") { + withSQLConf( + ORC_IMPLEMENTATION.key -> orcImpl, + CONVERT_METASTORE_ORC.key -> "true", + CONVERT_METASTORE_TABLE_PROPERTY.key -> "true") { withTable("t") { withTempPath { path => sql( @@ -2197,7 +2200,9 @@ class HiveDDLSuite } test("SPARK-23355 convertMetastoreParquet should not ignore table properties - STORED AS") { - withSQLConf(CONVERT_METASTORE_PARQUET.key -> "true") { + withSQLConf( + CONVERT_METASTORE_PARQUET.key -> "true", + CONVERT_METASTORE_TABLE_PROPERTY.key -> "true") { withTable("t") { withTempPath { path => sql( @@ -2225,6 +2230,49 @@ class HiveDDLSuite } } + test("Ignore ORC table properties for backward compatibility") { + Seq("native", "hive").foreach { orcImpl => + withSQLConf( + ORC_IMPLEMENTATION.key -> orcImpl, + CONVERT_METASTORE_ORC.key -> "true", + CONVERT_METASTORE_TABLE_PROPERTY.key -> "false") { + withTable("t") { + withTempPath { path => + sql( + s""" + |CREATE TABLE t(id int) STORED AS ORC + |TBLPROPERTIES (orc.compress 'NONE') + |LOCATION '${path.toURI}' + """.stripMargin) + sql("INSERT INTO t SELECT 1") + val maybeFile = path.listFiles().find(_.getName.startsWith("part")) + assertCompression(maybeFile, "orc", "SNAPPY") + } + } + } + } + } + + test("Ignore Parquet table properties for backward compatibility") { + withSQLConf( + CONVERT_METASTORE_PARQUET.key -> "true", + CONVERT_METASTORE_TABLE_PROPERTY.key -> "false") { + withTable("t") { + withTempPath { path => + sql( + s""" + |CREATE TABLE t(id int) STORED AS PARQUET + |TBLPROPERTIES (parquet.compression 'NONE') + |LOCATION '${path.toURI}' + """.stripMargin) + sql("INSERT INTO t SELECT 1") + val maybeFile = path.listFiles().find(_.getName.startsWith("part")) + assertCompression(maybeFile, "parquet", "SNAPPY") + } + } + } + } + test("load command for non local invalid path validation") { withTable("tbl") { sql("CREATE TABLE tbl(i INT, j STRING)") From 137e2dca16e6ab2015c96cb79462d726bfbd9ff4 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 7 May 2018 14:37:12 -0700 Subject: [PATCH 2/2] Address comments --- .../main/scala/org/apache/spark/sql/hive/HiveStrategies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 54363b05a03c..e09c940fcbab 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -194,7 +194,7 @@ case class RelationConversions( private def isParquetProperty(key: String) = conf.getConf(HiveUtils.CONVERT_METASTORE_TABLE_PROPERTY) && - key.startsWith("parquet.") || key.contains(".parquet.") + (key.startsWith("parquet.") || key.contains(".parquet.")) private def convert(relation: HiveTableRelation): LogicalRelation = { val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)