diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 2a0a944e4849..435a4c45d5b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -34,6 +34,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case AlterTableAddColumnsStatement( nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) => + cols.foreach(c => failVoidType(c.dataType)) cols.foreach(c => failCharType(c.dataType)) val changes = cols.map { col => TableChange.addColumn( @@ -47,6 +48,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case AlterTableReplaceColumnsStatement( nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) => + cols.foreach(c => failVoidType(c.dataType)) cols.foreach(c => failCharType(c.dataType)) val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match { case Some(table) => @@ -69,6 +71,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case a @ AlterTableAlterColumnStatement( nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) => + a.dataType.foreach(failVoidType) a.dataType.foreach(failCharType) val colName = a.column.toArray val typeChange = a.dataType.map { newDataType => @@ -145,6 +148,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ CreateTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) => + assertNoVoidTypeInSchema(c.tableSchema) assertNoCharTypeInSchema(c.tableSchema) CreateV2Table( catalog.asTableCatalog, @@ -172,6 +176,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ ReplaceTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) => + assertNoVoidTypeInSchema(c.tableSchema) assertNoCharTypeInSchema(c.tableSchema) ReplaceTable( catalog.asTableCatalog, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 03571a740df3..d30cf32c345b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2184,7 +2184,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * Create a Spark DataType. */ private def visitSparkDataType(ctx: DataTypeContext): DataType = { - HiveStringType.replaceCharType(typedVisit(ctx)) + HiveVoidType.replaceVoidType(HiveStringType.replaceCharType(typedVisit(ctx))) } /** @@ -2212,6 +2212,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging case ("decimal" | "dec" | "numeric", precision :: scale :: Nil) => DecimalType(precision.getText.toInt, scale.getText.toInt) case ("interval", Nil) => CalendarIntervalType + case ("void", Nil) => HiveVoidType case (dt, params) => val dtStr = if (params.nonEmpty) s"$dt(${params.mkString(",")})" else dt throw new ParseException(s"DataType $dtStr is not supported.", ctx) @@ -2258,9 +2259,9 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging builder.putString("comment", _) } - // Add Hive type string to metadata. + // Add Hive type 'string' and 'void' to metadata. val rawDataType = typedVisit[DataType](ctx.dataType) - val cleanedDataType = HiveStringType.replaceCharType(rawDataType) + val cleanedDataType = HiveVoidType.replaceVoidType(HiveStringType.replaceCharType(rawDataType)) if (rawDataType != cleanedDataType) { builder.putString(HIVE_TYPE_STRING, rawDataType.catalogString) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index e1f329352592..3d6b019a2fc2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.AlterTable import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, MapType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, HiveVoidType, MapType, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils @@ -339,6 +339,13 @@ private[sql] object CatalogV2Util { } } + def failVoidType(dt: DataType): Unit = { + if (HiveVoidType.containsVoidType(dt)) { + throw new AnalysisException( + "Cannot create tables with Hive VOID type.") + } + } + def assertNoCharTypeInSchema(schema: StructType): Unit = { schema.foreach { f => if (f.metadata.contains(HIVE_TYPE_STRING)) { @@ -346,4 +353,12 @@ private[sql] object CatalogV2Util { } } } + + def assertNoVoidTypeInSchema(schema: StructType): Unit = { + schema.foreach { f => + if (f.metadata.contains(HIVE_TYPE_STRING)) { + failVoidType(CatalystSqlParser.parseRawDataType(f.metadata.getString(HIVE_TYPE_STRING))) + } + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/HiveVoidType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/HiveVoidType.scala new file mode 100644 index 000000000000..b7717770ce68 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/HiveVoidType.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types + +/** + * A hive void type for compatibility. These datatypes should only used for parsing, + * and should NOT be used anywhere else. Any instance of these data types should be + * replaced by a [[NullType]] before analysis. + */ +class HiveVoidType private() extends DataType { + + override def defaultSize: Int = 1 + + override private[spark] def asNullable: HiveVoidType = this + + override def simpleString: String = "void" +} + +case object HiveVoidType extends HiveVoidType { + def replaceVoidType(dt: DataType): DataType = dt match { + case ArrayType(et, nullable) => + ArrayType(replaceVoidType(et), nullable) + case MapType(kt, vt, nullable) => + MapType(replaceVoidType(kt), replaceVoidType(vt), nullable) + case StructType(fields) => + StructType(fields.map(f => f.copy(dataType = replaceVoidType(f.dataType)))) + case _: HiveVoidType => NullType + case _ => dt + } + + def containsVoidType(dt: DataType): Boolean = dt match { + case ArrayType(et, _) => containsVoidType(et) + case MapType(kt, vt, _) => containsVoidType(kt) || containsVoidType(vt) + case StructType(fields) => fields.exists(f => containsVoidType(f.dataType)) + case _ => dt.isInstanceOf[HiveVoidType] + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala index d519fdf37878..d7f9166f06b8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala @@ -62,6 +62,7 @@ class DataTypeParserSuite extends SparkFunSuite { checkDataType("cHaR(27)", StringType) checkDataType("BINARY", BinaryType) checkDataType("interval", CalendarIntervalType) + checkDataType("void", NullType) checkDataType("array", ArrayType(DoubleType, true)) checkDataType("Array>", ArrayType(MapType(IntegerType, ByteType, true), true)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index bf90875e511f..902b9fae5983 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable} import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType} +import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, HiveVoidType, MetadataBuilder, StructField, StructType} /** * Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements @@ -50,6 +50,7 @@ class ResolveSessionCatalog( nameParts @ SessionCatalogAndTable(catalog, tbl), cols) => loadTable(catalog, tbl.asIdentifier).collect { case v1Table: V1Table => + cols.foreach(c => failVoidType(c.dataType)) if (!DDLUtils.isHiveTable(v1Table.v1Table)) { cols.foreach(c => failCharType(c.dataType)) } @@ -62,6 +63,7 @@ class ResolveSessionCatalog( } AlterTableAddColumnsCommand(tbl.asTableIdentifier, cols.map(convertToStructField)) }.getOrElse { + cols.foreach(c => failVoidType(c.dataType)) cols.foreach(c => failCharType(c.dataType)) val changes = cols.map { col => TableChange.addColumn( @@ -80,6 +82,7 @@ class ResolveSessionCatalog( case Some(_: V1Table) => throw new AnalysisException("REPLACE COLUMNS is only supported with v2 tables.") case Some(table) => + cols.foreach(c => failVoidType(c.dataType)) cols.foreach(c => failCharType(c.dataType)) // REPLACE COLUMNS deletes all the existing columns and adds new columns specified. val deleteChanges = table.schema.fieldNames.map { name => @@ -102,6 +105,7 @@ class ResolveSessionCatalog( nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) => loadTable(catalog, tbl.asIdentifier).collect { case v1Table: V1Table => + a.dataType.foreach(failVoidType) if (!DDLUtils.isHiveTable(v1Table.v1Table)) { a.dataType.foreach(failCharType) } @@ -131,8 +135,9 @@ class ResolveSessionCatalog( s"Available: ${v1Table.schema.fieldNames.mkString(", ")}") } } - // Add Hive type string to metadata. - val cleanedDataType = HiveStringType.replaceCharType(dataType) + // Add Hive type 'string' and 'void' to metadata. + val cleanedDataType = + HiveVoidType.replaceVoidType(HiveStringType.replaceCharType(dataType)) if (dataType != cleanedDataType) { builder.putString(HIVE_TYPE_STRING, dataType.catalogString) } @@ -143,6 +148,7 @@ class ResolveSessionCatalog( builder.build()) AlterTableChangeColumnCommand(tbl.asTableIdentifier, colName, newColumn) }.getOrElse { + a.dataType.foreach(failVoidType) a.dataType.foreach(failCharType) val colName = a.column.toArray val typeChange = a.dataType.map { newDataType => @@ -270,6 +276,7 @@ class ResolveSessionCatalog( SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) => val provider = c.provider.getOrElse(conf.defaultDataSourceName) if (!isV2Provider(provider)) { + assertNoVoidTypeInSchema(c.tableSchema) if (!DDLUtils.isHiveTable(Some(provider))) { assertNoCharTypeInSchema(c.tableSchema) } @@ -279,6 +286,7 @@ class ResolveSessionCatalog( val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTable(tableDesc, mode, None) } else { + assertNoVoidTypeInSchema(c.tableSchema) assertNoCharTypeInSchema(c.tableSchema) CreateV2Table( catalog.asTableCatalog, @@ -323,6 +331,7 @@ class ResolveSessionCatalog( if (!isV2Provider(provider)) { throw new AnalysisException("REPLACE TABLE is only supported with v2 tables.") } else { + assertNoVoidTypeInSchema(c.tableSchema) assertNoCharTypeInSchema(c.tableSchema) ReplaceTable( catalog.asTableCatalog, @@ -719,7 +728,8 @@ class ResolveSessionCatalog( val builder = new MetadataBuilder col.comment.foreach(builder.putString("comment", _)) - val cleanedDataType = HiveStringType.replaceCharType(col.dataType) + val cleanedDataType = + HiveVoidType.replaceVoidType(HiveStringType.replaceCharType(col.dataType)) if (col.dataType != cleanedDataType) { builder.putString(HIVE_TYPE_STRING, col.dataType.catalogString) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index b9c98f4ea15e..7f7cdf8f3d40 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoVoidTypeInSchema import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.CreateTable @@ -106,6 +107,7 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { } else { withStorage } + assertNoVoidTypeInSchema(withSchema.schema) c.copy(tableDesc = withSchema) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index e8cf4ad5d9f2..83925210b6be 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -2309,6 +2309,85 @@ class HiveDDLSuite } } + test("SPARK-20680: Spark-sql do not support for void column datatype of view") { + withTable("t") { + withView("tblVoidType") { + val client = + spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client + client.runSqlHive("CREATE TABLE t (t1 int)") + client.runSqlHive("INSERT INTO t VALUES (3)") + client.runSqlHive("CREATE VIEW tblVoidType AS SELECT NULL AS col FROM t") + checkAnswer(spark.table("tblVoidType"), Row(null)) + // No exception shows + val desc = spark.sql("DESC tblVoidType").collect().toSeq + assert(desc.contains(Row("col", "null", null))) + } + } + + // Forbid creating Hive table with void type in Spark + withTable("t1", "t2", "t3") { + val e1 = intercept[AnalysisException] { + spark.sql("CREATE TABLE t1 (v void) USING parquet") + }.getMessage + assert(e1.contains("Cannot create tables with Hive VOID type")) + val e2 = intercept[AnalysisException] { + spark.sql("CREATE TABLE t2 (v void) USING hive") + }.getMessage + assert(e2.contains("Cannot create tables with Hive VOID type")) + val e3 = intercept[AnalysisException] { + spark.sql("CREATE TABLE t3 (v void)") + }.getMessage + assert(e3.contains("Cannot create tables with Hive VOID type")) + } + + // Make sure spark.catalog.createTable with void type will fail + val schema1 = new StructType().add("c", HiveVoidType) + assertHiveTableVoidType(schema1) + assertDSTableVoidType(schema1) + + val schema2 = new StructType() + .add("c", StructType(Seq(StructField.apply("c1", HiveVoidType)))) + assertHiveTableVoidType(schema2) + assertDSTableVoidType(schema2) + + val schema3 = new StructType().add("c", ArrayType(HiveVoidType)) + assertHiveTableVoidType(schema3) + assertDSTableVoidType(schema3) + + val schema4 = new StructType() + .add("c", MapType(StringType, HiveVoidType)) + assertHiveTableVoidType(schema4) + assertDSTableVoidType(schema4) + + val schema5 = new StructType() + .add("c", MapType(HiveVoidType, StringType)) + assertHiveTableVoidType(schema5) + assertDSTableVoidType(schema5) + } + + private def assertHiveTableVoidType(schema: StructType): Unit = { + withTable("t") { + intercept[AnalysisException] { + spark.catalog.createTable( + tableName = "t", + source = "hive", + schema = schema, + options = Map("fileFormat" -> "parquet")) + } + } + } + private def assertDSTableVoidType(schema: StructType): Unit = { + withTable("t") { + intercept[AnalysisException] { + spark.catalog.createTable( + tableName = "t", + source = "parquet", + schema = schema, + options = Map.empty[String, String]) + } + } + } + test("SPARK-21216: join with a streaming DataFrame") { import org.apache.spark.sql.execution.streaming.MemoryStream import testImplicits._