From cb0e544aa814682bac9f174febf23cc59f47cc0e Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 29 Nov 2021 17:05:56 +0800 Subject: [PATCH 1/2] [SPARK-37452][SQL][BACKPORT][3.1] Char and Varchar break backward compatibility between v3.1 and v2 --- .../spark/sql/hive/HiveExternalCatalog.scala | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 019718cc53a95..8e17246626abd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive import java.io.IOException import java.lang.reflect.InvocationTargetException + import java.util import java.util.Locale @@ -40,7 +41,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils, DateTimeUtils} import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions} import org.apache.spark.sql.hive.client.HiveClient @@ -429,12 +430,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val properties = new mutable.HashMap[String, String] properties.put(CREATED_SPARK_VERSION, table.createVersion) - + // This is for backward compatibility to Spark 2 to read tables with char/varchar created by + // Spark 3.1. At read side, we will restore a table schema from its properties. So, we need to + // clear the `varchar(n)` and `char(n)` and replace them with `string` as Spark 2 does not have + // a type mapping for them in `DataType.nameToType`. + // See `restoreHiveSerdeTable` for example. + val newSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema) // Serialized JSON schema string may be too long to be stored into a single metastore table // property. In this case, we split the JSON string and store each part as a separate table // property. val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD) - val schemaJsonString = schema.json + val schemaJsonString = newSchema.json // Split the JSON string. val parts = schemaJsonString.grouped(threshold).toSeq properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString) @@ -745,7 +751,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // If this is a view created by Spark 2.2 or higher versions, we should restore its schema // from table properties. if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) { - table = table.copy(schema = getSchemaFromTableProperties(table)) + val newSchema = CharVarcharUtils.getRawSchema(getSchemaFromTableProperties(table)) + table = table.copy(schema = newSchema) } // No provider in table properties, which means this is a Hive serde table. @@ -796,7 +803,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its // schema from table properties. if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) { - val schemaFromTableProps = getSchemaFromTableProperties(table) + val schemaFromTableProps = + CharVarcharUtils.getRawSchema(getSchemaFromTableProperties(table)) val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) @@ -836,7 +844,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap) val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER) - val schemaFromTableProps = getSchemaFromTableProperties(table) + val schemaFromTableProps = CharVarcharUtils.getRawSchema(getSchemaFromTableProperties(table)) val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) From efe00f7be7f4397d44fcb8337cf21a877e39f1cc Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 29 Nov 2021 17:52:43 +0800 Subject: [PATCH 2/2] [SPARK-37452][SQL][BACKPORT][3.1] Char and Varchar break backward compatibility between v3.1 and v2 --- .../scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 8e17246626abd..1e43f2cc62ae6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive import java.io.IOException import java.lang.reflect.InvocationTargetException - import java.util import java.util.Locale